1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
5 #include <linux/kernel.h>
6 #include <linux/sched/signal.h>
7 #include <linux/slab.h>
8 #include <linux/vmalloc.h>
9 #include <linux/wait.h>
10 #include <linux/writeback.h>
11 #include <linux/iversion.h>
12 #include <linux/filelock.h>
13 #include <linux/jiffies.h>
16 #include "mds_client.h"
19 #include <linux/ceph/decode.h>
20 #include <linux/ceph/messenger.h>
23 * Capability management
25 * The Ceph metadata servers control client access to inode metadata
26 * and file data by issuing capabilities, granting clients permission
27 * to read and/or write both inode field and file data to OSDs
28 * (storage nodes). Each capability consists of a set of bits
29 * indicating which operations are allowed.
31 * If the client holds a *_SHARED cap, the client has a coherent value
32 * that can be safely read from the cached inode.
34 * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the
35 * client is allowed to change inode attributes (e.g., file size,
36 * mtime), note its dirty state in the ceph_cap, and asynchronously
37 * flush that metadata change to the MDS.
39 * In the event of a conflicting operation (perhaps by another
40 * client), the MDS will revoke the conflicting client capabilities.
42 * In order for a client to cache an inode, it must hold a capability
43 * with at least one MDS server. When inodes are released, release
44 * notifications are batched and periodically sent en masse to the MDS
45 * cluster to release server state.
48 static u64
__get_oldest_flush_tid(struct ceph_mds_client
*mdsc
);
49 static void __kick_flushing_caps(struct ceph_mds_client
*mdsc
,
50 struct ceph_mds_session
*session
,
51 struct ceph_inode_info
*ci
,
52 u64 oldest_flush_tid
);
55 * Generate readable cap strings for debugging output.
57 #define MAX_CAP_STR 20
58 static char cap_str
[MAX_CAP_STR
][40];
59 static DEFINE_SPINLOCK(cap_str_lock
);
60 static int last_cap_str
;
62 static char *gcap_string(char *s
, int c
)
64 if (c
& CEPH_CAP_GSHARED
)
66 if (c
& CEPH_CAP_GEXCL
)
68 if (c
& CEPH_CAP_GCACHE
)
74 if (c
& CEPH_CAP_GBUFFER
)
76 if (c
& CEPH_CAP_GWREXTEND
)
78 if (c
& CEPH_CAP_GLAZYIO
)
83 const char *ceph_cap_string(int caps
)
89 spin_lock(&cap_str_lock
);
91 if (last_cap_str
== MAX_CAP_STR
)
93 spin_unlock(&cap_str_lock
);
97 if (caps
& CEPH_CAP_PIN
)
100 c
= (caps
>> CEPH_CAP_SAUTH
) & 3;
103 s
= gcap_string(s
, c
);
106 c
= (caps
>> CEPH_CAP_SLINK
) & 3;
109 s
= gcap_string(s
, c
);
112 c
= (caps
>> CEPH_CAP_SXATTR
) & 3;
115 s
= gcap_string(s
, c
);
118 c
= caps
>> CEPH_CAP_SFILE
;
121 s
= gcap_string(s
, c
);
130 void ceph_caps_init(struct ceph_mds_client
*mdsc
)
132 INIT_LIST_HEAD(&mdsc
->caps_list
);
133 spin_lock_init(&mdsc
->caps_list_lock
);
136 void ceph_caps_finalize(struct ceph_mds_client
*mdsc
)
138 struct ceph_cap
*cap
;
140 spin_lock(&mdsc
->caps_list_lock
);
141 while (!list_empty(&mdsc
->caps_list
)) {
142 cap
= list_first_entry(&mdsc
->caps_list
,
143 struct ceph_cap
, caps_item
);
144 list_del(&cap
->caps_item
);
145 kmem_cache_free(ceph_cap_cachep
, cap
);
147 mdsc
->caps_total_count
= 0;
148 mdsc
->caps_avail_count
= 0;
149 mdsc
->caps_use_count
= 0;
150 mdsc
->caps_reserve_count
= 0;
151 mdsc
->caps_min_count
= 0;
152 spin_unlock(&mdsc
->caps_list_lock
);
155 void ceph_adjust_caps_max_min(struct ceph_mds_client
*mdsc
,
156 struct ceph_mount_options
*fsopt
)
158 spin_lock(&mdsc
->caps_list_lock
);
159 mdsc
->caps_min_count
= fsopt
->max_readdir
;
160 if (mdsc
->caps_min_count
< 1024)
161 mdsc
->caps_min_count
= 1024;
162 mdsc
->caps_use_max
= fsopt
->caps_max
;
163 if (mdsc
->caps_use_max
> 0 &&
164 mdsc
->caps_use_max
< mdsc
->caps_min_count
)
165 mdsc
->caps_use_max
= mdsc
->caps_min_count
;
166 spin_unlock(&mdsc
->caps_list_lock
);
169 static void __ceph_unreserve_caps(struct ceph_mds_client
*mdsc
, int nr_caps
)
171 struct ceph_cap
*cap
;
175 BUG_ON(mdsc
->caps_reserve_count
< nr_caps
);
176 mdsc
->caps_reserve_count
-= nr_caps
;
177 if (mdsc
->caps_avail_count
>=
178 mdsc
->caps_reserve_count
+ mdsc
->caps_min_count
) {
179 mdsc
->caps_total_count
-= nr_caps
;
180 for (i
= 0; i
< nr_caps
; i
++) {
181 cap
= list_first_entry(&mdsc
->caps_list
,
182 struct ceph_cap
, caps_item
);
183 list_del(&cap
->caps_item
);
184 kmem_cache_free(ceph_cap_cachep
, cap
);
187 mdsc
->caps_avail_count
+= nr_caps
;
190 doutc(mdsc
->fsc
->client
,
191 "caps %d = %d used + %d resv + %d avail\n",
192 mdsc
->caps_total_count
, mdsc
->caps_use_count
,
193 mdsc
->caps_reserve_count
, mdsc
->caps_avail_count
);
194 BUG_ON(mdsc
->caps_total_count
!= mdsc
->caps_use_count
+
195 mdsc
->caps_reserve_count
+
196 mdsc
->caps_avail_count
);
201 * Called under mdsc->mutex.
203 int ceph_reserve_caps(struct ceph_mds_client
*mdsc
,
204 struct ceph_cap_reservation
*ctx
, int need
)
206 struct ceph_client
*cl
= mdsc
->fsc
->client
;
208 struct ceph_cap
*cap
;
213 bool trimmed
= false;
214 struct ceph_mds_session
*s
;
217 doutc(cl
, "ctx=%p need=%d\n", ctx
, need
);
219 /* first reserve any caps that are already allocated */
220 spin_lock(&mdsc
->caps_list_lock
);
221 if (mdsc
->caps_avail_count
>= need
)
224 have
= mdsc
->caps_avail_count
;
225 mdsc
->caps_avail_count
-= have
;
226 mdsc
->caps_reserve_count
+= have
;
227 BUG_ON(mdsc
->caps_total_count
!= mdsc
->caps_use_count
+
228 mdsc
->caps_reserve_count
+
229 mdsc
->caps_avail_count
);
230 spin_unlock(&mdsc
->caps_list_lock
);
232 for (i
= have
; i
< need
; ) {
233 cap
= kmem_cache_alloc(ceph_cap_cachep
, GFP_NOFS
);
235 list_add(&cap
->caps_item
, &newcaps
);
242 for (j
= 0; j
< mdsc
->max_sessions
; j
++) {
243 s
= __ceph_lookup_mds_session(mdsc
, j
);
246 mutex_unlock(&mdsc
->mutex
);
248 mutex_lock(&s
->s_mutex
);
249 max_caps
= s
->s_nr_caps
- (need
- i
);
250 ceph_trim_caps(mdsc
, s
, max_caps
);
251 mutex_unlock(&s
->s_mutex
);
253 ceph_put_mds_session(s
);
254 mutex_lock(&mdsc
->mutex
);
258 spin_lock(&mdsc
->caps_list_lock
);
259 if (mdsc
->caps_avail_count
) {
261 if (mdsc
->caps_avail_count
>= need
- i
)
262 more_have
= need
- i
;
264 more_have
= mdsc
->caps_avail_count
;
268 mdsc
->caps_avail_count
-= more_have
;
269 mdsc
->caps_reserve_count
+= more_have
;
272 spin_unlock(&mdsc
->caps_list_lock
);
277 pr_warn_client(cl
, "ctx=%p ENOMEM need=%d got=%d\n", ctx
, need
,
284 BUG_ON(have
+ alloc
!= need
);
289 spin_lock(&mdsc
->caps_list_lock
);
290 mdsc
->caps_total_count
+= alloc
;
291 mdsc
->caps_reserve_count
+= alloc
;
292 list_splice(&newcaps
, &mdsc
->caps_list
);
294 BUG_ON(mdsc
->caps_total_count
!= mdsc
->caps_use_count
+
295 mdsc
->caps_reserve_count
+
296 mdsc
->caps_avail_count
);
299 __ceph_unreserve_caps(mdsc
, have
+ alloc
);
301 spin_unlock(&mdsc
->caps_list_lock
);
303 doutc(cl
, "ctx=%p %d = %d used + %d resv + %d avail\n", ctx
,
304 mdsc
->caps_total_count
, mdsc
->caps_use_count
,
305 mdsc
->caps_reserve_count
, mdsc
->caps_avail_count
);
309 void ceph_unreserve_caps(struct ceph_mds_client
*mdsc
,
310 struct ceph_cap_reservation
*ctx
)
312 struct ceph_client
*cl
= mdsc
->fsc
->client
;
313 bool reclaim
= false;
317 doutc(cl
, "ctx=%p count=%d\n", ctx
, ctx
->count
);
318 spin_lock(&mdsc
->caps_list_lock
);
319 __ceph_unreserve_caps(mdsc
, ctx
->count
);
322 if (mdsc
->caps_use_max
> 0 &&
323 mdsc
->caps_use_count
> mdsc
->caps_use_max
)
325 spin_unlock(&mdsc
->caps_list_lock
);
328 ceph_reclaim_caps_nr(mdsc
, ctx
->used
);
331 struct ceph_cap
*ceph_get_cap(struct ceph_mds_client
*mdsc
,
332 struct ceph_cap_reservation
*ctx
)
334 struct ceph_client
*cl
= mdsc
->fsc
->client
;
335 struct ceph_cap
*cap
= NULL
;
337 /* temporary, until we do something about cap import/export */
339 cap
= kmem_cache_alloc(ceph_cap_cachep
, GFP_NOFS
);
341 spin_lock(&mdsc
->caps_list_lock
);
342 mdsc
->caps_use_count
++;
343 mdsc
->caps_total_count
++;
344 spin_unlock(&mdsc
->caps_list_lock
);
346 spin_lock(&mdsc
->caps_list_lock
);
347 if (mdsc
->caps_avail_count
) {
348 BUG_ON(list_empty(&mdsc
->caps_list
));
350 mdsc
->caps_avail_count
--;
351 mdsc
->caps_use_count
++;
352 cap
= list_first_entry(&mdsc
->caps_list
,
353 struct ceph_cap
, caps_item
);
354 list_del(&cap
->caps_item
);
356 BUG_ON(mdsc
->caps_total_count
!= mdsc
->caps_use_count
+
357 mdsc
->caps_reserve_count
+ mdsc
->caps_avail_count
);
359 spin_unlock(&mdsc
->caps_list_lock
);
365 spin_lock(&mdsc
->caps_list_lock
);
366 doutc(cl
, "ctx=%p (%d) %d = %d used + %d resv + %d avail\n", ctx
,
367 ctx
->count
, mdsc
->caps_total_count
, mdsc
->caps_use_count
,
368 mdsc
->caps_reserve_count
, mdsc
->caps_avail_count
);
370 BUG_ON(ctx
->count
> mdsc
->caps_reserve_count
);
371 BUG_ON(list_empty(&mdsc
->caps_list
));
375 mdsc
->caps_reserve_count
--;
376 mdsc
->caps_use_count
++;
378 cap
= list_first_entry(&mdsc
->caps_list
, struct ceph_cap
, caps_item
);
379 list_del(&cap
->caps_item
);
381 BUG_ON(mdsc
->caps_total_count
!= mdsc
->caps_use_count
+
382 mdsc
->caps_reserve_count
+ mdsc
->caps_avail_count
);
383 spin_unlock(&mdsc
->caps_list_lock
);
387 void ceph_put_cap(struct ceph_mds_client
*mdsc
, struct ceph_cap
*cap
)
389 struct ceph_client
*cl
= mdsc
->fsc
->client
;
391 spin_lock(&mdsc
->caps_list_lock
);
392 doutc(cl
, "%p %d = %d used + %d resv + %d avail\n", cap
,
393 mdsc
->caps_total_count
, mdsc
->caps_use_count
,
394 mdsc
->caps_reserve_count
, mdsc
->caps_avail_count
);
395 mdsc
->caps_use_count
--;
397 * Keep some preallocated caps around (ceph_min_count), to
398 * avoid lots of free/alloc churn.
400 if (mdsc
->caps_avail_count
>= mdsc
->caps_reserve_count
+
401 mdsc
->caps_min_count
) {
402 mdsc
->caps_total_count
--;
403 kmem_cache_free(ceph_cap_cachep
, cap
);
405 mdsc
->caps_avail_count
++;
406 list_add(&cap
->caps_item
, &mdsc
->caps_list
);
409 BUG_ON(mdsc
->caps_total_count
!= mdsc
->caps_use_count
+
410 mdsc
->caps_reserve_count
+ mdsc
->caps_avail_count
);
411 spin_unlock(&mdsc
->caps_list_lock
);
414 void ceph_reservation_status(struct ceph_fs_client
*fsc
,
415 int *total
, int *avail
, int *used
, int *reserved
,
418 struct ceph_mds_client
*mdsc
= fsc
->mdsc
;
420 spin_lock(&mdsc
->caps_list_lock
);
423 *total
= mdsc
->caps_total_count
;
425 *avail
= mdsc
->caps_avail_count
;
427 *used
= mdsc
->caps_use_count
;
429 *reserved
= mdsc
->caps_reserve_count
;
431 *min
= mdsc
->caps_min_count
;
433 spin_unlock(&mdsc
->caps_list_lock
);
437 * Find ceph_cap for given mds, if any.
439 * Called with i_ceph_lock held.
441 struct ceph_cap
*__get_cap_for_mds(struct ceph_inode_info
*ci
, int mds
)
443 struct ceph_cap
*cap
;
444 struct rb_node
*n
= ci
->i_caps
.rb_node
;
447 cap
= rb_entry(n
, struct ceph_cap
, ci_node
);
450 else if (mds
> cap
->mds
)
458 struct ceph_cap
*ceph_get_cap_for_mds(struct ceph_inode_info
*ci
, int mds
)
460 struct ceph_cap
*cap
;
462 spin_lock(&ci
->i_ceph_lock
);
463 cap
= __get_cap_for_mds(ci
, mds
);
464 spin_unlock(&ci
->i_ceph_lock
);
469 * Called under i_ceph_lock.
471 static void __insert_cap_node(struct ceph_inode_info
*ci
,
472 struct ceph_cap
*new)
474 struct rb_node
**p
= &ci
->i_caps
.rb_node
;
475 struct rb_node
*parent
= NULL
;
476 struct ceph_cap
*cap
= NULL
;
480 cap
= rb_entry(parent
, struct ceph_cap
, ci_node
);
481 if (new->mds
< cap
->mds
)
483 else if (new->mds
> cap
->mds
)
489 rb_link_node(&new->ci_node
, parent
, p
);
490 rb_insert_color(&new->ci_node
, &ci
->i_caps
);
494 * (re)set cap hold timeouts, which control the delayed release
495 * of unused caps back to the MDS. Should be called on cap use.
497 static void __cap_set_timeouts(struct ceph_mds_client
*mdsc
,
498 struct ceph_inode_info
*ci
)
500 struct inode
*inode
= &ci
->netfs
.inode
;
501 struct ceph_mount_options
*opt
= mdsc
->fsc
->mount_options
;
503 ci
->i_hold_caps_max
= round_jiffies(jiffies
+
504 opt
->caps_wanted_delay_max
* HZ
);
505 doutc(mdsc
->fsc
->client
, "%p %llx.%llx %lu\n", inode
,
506 ceph_vinop(inode
), ci
->i_hold_caps_max
- jiffies
);
510 * (Re)queue cap at the end of the delayed cap release list.
512 * If I_FLUSH is set, leave the inode at the front of the list.
514 * Caller holds i_ceph_lock
515 * -> we take mdsc->cap_delay_lock
517 static void __cap_delay_requeue(struct ceph_mds_client
*mdsc
,
518 struct ceph_inode_info
*ci
)
520 struct inode
*inode
= &ci
->netfs
.inode
;
522 doutc(mdsc
->fsc
->client
, "%p %llx.%llx flags 0x%lx at %lu\n",
523 inode
, ceph_vinop(inode
), ci
->i_ceph_flags
,
524 ci
->i_hold_caps_max
);
525 if (!mdsc
->stopping
) {
526 spin_lock(&mdsc
->cap_delay_lock
);
527 if (!list_empty(&ci
->i_cap_delay_list
)) {
528 if (ci
->i_ceph_flags
& CEPH_I_FLUSH
)
530 list_del_init(&ci
->i_cap_delay_list
);
532 __cap_set_timeouts(mdsc
, ci
);
533 list_add_tail(&ci
->i_cap_delay_list
, &mdsc
->cap_delay_list
);
535 spin_unlock(&mdsc
->cap_delay_lock
);
540 * Queue an inode for immediate writeback. Mark inode with I_FLUSH,
541 * indicating we should send a cap message to flush dirty metadata
542 * asap, and move to the front of the delayed cap list.
544 static void __cap_delay_requeue_front(struct ceph_mds_client
*mdsc
,
545 struct ceph_inode_info
*ci
)
547 struct inode
*inode
= &ci
->netfs
.inode
;
549 doutc(mdsc
->fsc
->client
, "%p %llx.%llx\n", inode
, ceph_vinop(inode
));
550 spin_lock(&mdsc
->cap_delay_lock
);
551 ci
->i_ceph_flags
|= CEPH_I_FLUSH
;
552 if (!list_empty(&ci
->i_cap_delay_list
))
553 list_del_init(&ci
->i_cap_delay_list
);
554 list_add(&ci
->i_cap_delay_list
, &mdsc
->cap_delay_list
);
555 spin_unlock(&mdsc
->cap_delay_lock
);
559 * Cancel delayed work on cap.
561 * Caller must hold i_ceph_lock.
563 static void __cap_delay_cancel(struct ceph_mds_client
*mdsc
,
564 struct ceph_inode_info
*ci
)
566 struct inode
*inode
= &ci
->netfs
.inode
;
568 doutc(mdsc
->fsc
->client
, "%p %llx.%llx\n", inode
, ceph_vinop(inode
));
569 if (list_empty(&ci
->i_cap_delay_list
))
571 spin_lock(&mdsc
->cap_delay_lock
);
572 list_del_init(&ci
->i_cap_delay_list
);
573 spin_unlock(&mdsc
->cap_delay_lock
);
576 /* Common issue checks for add_cap, handle_cap_grant. */
577 static void __check_cap_issue(struct ceph_inode_info
*ci
, struct ceph_cap
*cap
,
580 struct inode
*inode
= &ci
->netfs
.inode
;
581 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
583 unsigned had
= __ceph_caps_issued(ci
, NULL
);
585 lockdep_assert_held(&ci
->i_ceph_lock
);
588 * Each time we receive FILE_CACHE anew, we increment
591 if (S_ISREG(ci
->netfs
.inode
.i_mode
) &&
592 (issued
& (CEPH_CAP_FILE_CACHE
|CEPH_CAP_FILE_LAZYIO
)) &&
593 (had
& (CEPH_CAP_FILE_CACHE
|CEPH_CAP_FILE_LAZYIO
)) == 0) {
598 * If FILE_SHARED is newly issued, mark dir not complete. We don't
599 * know what happened to this directory while we didn't have the cap.
600 * If FILE_SHARED is being revoked, also mark dir not complete. It
601 * stops on-going cached readdir.
603 if ((issued
& CEPH_CAP_FILE_SHARED
) != (had
& CEPH_CAP_FILE_SHARED
)) {
604 if (issued
& CEPH_CAP_FILE_SHARED
)
605 atomic_inc(&ci
->i_shared_gen
);
606 if (S_ISDIR(ci
->netfs
.inode
.i_mode
)) {
607 doutc(cl
, " marking %p NOT complete\n", inode
);
608 __ceph_dir_clear_complete(ci
);
612 /* Wipe saved layout if we're losing DIR_CREATE caps */
613 if (S_ISDIR(ci
->netfs
.inode
.i_mode
) && (had
& CEPH_CAP_DIR_CREATE
) &&
614 !(issued
& CEPH_CAP_DIR_CREATE
)) {
615 ceph_put_string(rcu_dereference_raw(ci
->i_cached_layout
.pool_ns
));
616 memset(&ci
->i_cached_layout
, 0, sizeof(ci
->i_cached_layout
));
621 * change_auth_cap_ses - move inode to appropriate lists when auth caps change
622 * @ci: inode to be moved
623 * @session: new auth caps session
625 void change_auth_cap_ses(struct ceph_inode_info
*ci
,
626 struct ceph_mds_session
*session
)
628 lockdep_assert_held(&ci
->i_ceph_lock
);
630 if (list_empty(&ci
->i_dirty_item
) && list_empty(&ci
->i_flushing_item
))
633 spin_lock(&session
->s_mdsc
->cap_dirty_lock
);
634 if (!list_empty(&ci
->i_dirty_item
))
635 list_move(&ci
->i_dirty_item
, &session
->s_cap_dirty
);
636 if (!list_empty(&ci
->i_flushing_item
))
637 list_move_tail(&ci
->i_flushing_item
, &session
->s_cap_flushing
);
638 spin_unlock(&session
->s_mdsc
->cap_dirty_lock
);
642 * Add a capability under the given MDS session.
644 * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock
646 * @fmode is the open file mode, if we are opening a file, otherwise
647 * it is < 0. (This is so we can atomically add the cap and add an
648 * open file reference to it.)
650 void ceph_add_cap(struct inode
*inode
,
651 struct ceph_mds_session
*session
, u64 cap_id
,
652 unsigned issued
, unsigned wanted
,
653 unsigned seq
, unsigned mseq
, u64 realmino
, int flags
,
654 struct ceph_cap
**new_cap
)
656 struct ceph_mds_client
*mdsc
= ceph_inode_to_fs_client(inode
)->mdsc
;
657 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
658 struct ceph_inode_info
*ci
= ceph_inode(inode
);
659 struct ceph_cap
*cap
;
660 int mds
= session
->s_mds
;
664 lockdep_assert_held(&ci
->i_ceph_lock
);
666 doutc(cl
, "%p %llx.%llx mds%d cap %llx %s seq %d\n", inode
,
667 ceph_vinop(inode
), session
->s_mds
, cap_id
,
668 ceph_cap_string(issued
), seq
);
670 gen
= atomic_read(&session
->s_cap_gen
);
672 cap
= __get_cap_for_mds(ci
, mds
);
678 cap
->implemented
= 0;
684 __insert_cap_node(ci
, cap
);
686 /* add to session cap list */
687 cap
->session
= session
;
688 spin_lock(&session
->s_cap_lock
);
689 list_add_tail(&cap
->session_caps
, &session
->s_caps
);
690 session
->s_nr_caps
++;
691 atomic64_inc(&mdsc
->metric
.total_caps
);
692 spin_unlock(&session
->s_cap_lock
);
694 spin_lock(&session
->s_cap_lock
);
695 list_move_tail(&cap
->session_caps
, &session
->s_caps
);
696 spin_unlock(&session
->s_cap_lock
);
698 if (cap
->cap_gen
< gen
)
699 cap
->issued
= cap
->implemented
= CEPH_CAP_PIN
;
702 * auth mds of the inode changed. we received the cap export
703 * message, but still haven't received the cap import message.
704 * handle_cap_export() updated the new auth MDS' cap.
706 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
707 * a message that was send before the cap import message. So
710 if (ceph_seq_cmp(seq
, cap
->seq
) <= 0) {
711 WARN_ON(cap
!= ci
->i_auth_cap
);
712 WARN_ON(cap
->cap_id
!= cap_id
);
715 issued
|= cap
->issued
;
716 flags
|= CEPH_CAP_FLAG_AUTH
;
720 if (!ci
->i_snap_realm
||
721 ((flags
& CEPH_CAP_FLAG_AUTH
) &&
722 realmino
!= (u64
)-1 && ci
->i_snap_realm
->ino
!= realmino
)) {
724 * add this inode to the appropriate snap realm
726 struct ceph_snap_realm
*realm
= ceph_lookup_snap_realm(mdsc
,
729 ceph_change_snap_realm(inode
, realm
);
731 WARN(1, "%s: couldn't find snap realm 0x%llx (ino 0x%llx oldrealm 0x%llx)\n",
732 __func__
, realmino
, ci
->i_vino
.ino
,
733 ci
->i_snap_realm
? ci
->i_snap_realm
->ino
: 0);
736 __check_cap_issue(ci
, cap
, issued
);
739 * If we are issued caps we don't want, or the mds' wanted
740 * value appears to be off, queue a check so we'll release
741 * later and/or update the mds wanted value.
743 actual_wanted
= __ceph_caps_wanted(ci
);
744 if ((wanted
& ~actual_wanted
) ||
745 (issued
& ~actual_wanted
& CEPH_CAP_ANY_WR
)) {
746 doutc(cl
, "issued %s, mds wanted %s, actual %s, queueing\n",
747 ceph_cap_string(issued
), ceph_cap_string(wanted
),
748 ceph_cap_string(actual_wanted
));
749 __cap_delay_requeue(mdsc
, ci
);
752 if (flags
& CEPH_CAP_FLAG_AUTH
) {
753 if (!ci
->i_auth_cap
||
754 ceph_seq_cmp(ci
->i_auth_cap
->mseq
, mseq
) < 0) {
755 if (ci
->i_auth_cap
&&
756 ci
->i_auth_cap
->session
!= cap
->session
)
757 change_auth_cap_ses(ci
, cap
->session
);
758 ci
->i_auth_cap
= cap
;
759 cap
->mds_wanted
= wanted
;
762 WARN_ON(ci
->i_auth_cap
== cap
);
765 doutc(cl
, "inode %p %llx.%llx cap %p %s now %s seq %d mds%d\n",
766 inode
, ceph_vinop(inode
), cap
, ceph_cap_string(issued
),
767 ceph_cap_string(issued
|cap
->issued
), seq
, mds
);
768 cap
->cap_id
= cap_id
;
769 cap
->issued
= issued
;
770 cap
->implemented
|= issued
;
771 if (ceph_seq_cmp(mseq
, cap
->mseq
) > 0)
772 cap
->mds_wanted
= wanted
;
774 cap
->mds_wanted
|= wanted
;
776 cap
->issue_seq
= seq
;
779 wake_up_all(&ci
->i_cap_wq
);
783 * Return true if cap has not timed out and belongs to the current
784 * generation of the MDS session (i.e. has not gone 'stale' due to
785 * us losing touch with the mds).
787 static int __cap_is_valid(struct ceph_cap
*cap
)
789 struct inode
*inode
= &cap
->ci
->netfs
.inode
;
790 struct ceph_client
*cl
= cap
->session
->s_mdsc
->fsc
->client
;
794 gen
= atomic_read(&cap
->session
->s_cap_gen
);
795 ttl
= cap
->session
->s_cap_ttl
;
797 if (cap
->cap_gen
< gen
|| time_after_eq(jiffies
, ttl
)) {
798 doutc(cl
, "%p %llx.%llx cap %p issued %s but STALE (gen %u vs %u)\n",
799 inode
, ceph_vinop(inode
), cap
,
800 ceph_cap_string(cap
->issued
), cap
->cap_gen
, gen
);
808 * Return set of valid cap bits issued to us. Note that caps time
809 * out, and may be invalidated in bulk if the client session times out
810 * and session->s_cap_gen is bumped.
812 int __ceph_caps_issued(struct ceph_inode_info
*ci
, int *implemented
)
814 struct inode
*inode
= &ci
->netfs
.inode
;
815 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
816 int have
= ci
->i_snap_caps
;
817 struct ceph_cap
*cap
;
822 for (p
= rb_first(&ci
->i_caps
); p
; p
= rb_next(p
)) {
823 cap
= rb_entry(p
, struct ceph_cap
, ci_node
);
824 if (!__cap_is_valid(cap
))
826 doutc(cl
, "%p %llx.%llx cap %p issued %s\n", inode
,
827 ceph_vinop(inode
), cap
, ceph_cap_string(cap
->issued
));
830 *implemented
|= cap
->implemented
;
833 * exclude caps issued by non-auth MDS, but are been revoking
834 * by the auth MDS. The non-auth MDS should be revoking/exporting
835 * these caps, but the message is delayed.
837 if (ci
->i_auth_cap
) {
838 cap
= ci
->i_auth_cap
;
839 have
&= ~cap
->implemented
| cap
->issued
;
845 * Get cap bits issued by caps other than @ocap
847 int __ceph_caps_issued_other(struct ceph_inode_info
*ci
, struct ceph_cap
*ocap
)
849 int have
= ci
->i_snap_caps
;
850 struct ceph_cap
*cap
;
853 for (p
= rb_first(&ci
->i_caps
); p
; p
= rb_next(p
)) {
854 cap
= rb_entry(p
, struct ceph_cap
, ci_node
);
857 if (!__cap_is_valid(cap
))
865 * Move a cap to the end of the LRU (oldest caps at list head, newest
868 static void __touch_cap(struct ceph_cap
*cap
)
870 struct inode
*inode
= &cap
->ci
->netfs
.inode
;
871 struct ceph_mds_session
*s
= cap
->session
;
872 struct ceph_client
*cl
= s
->s_mdsc
->fsc
->client
;
874 spin_lock(&s
->s_cap_lock
);
875 if (!s
->s_cap_iterator
) {
876 doutc(cl
, "%p %llx.%llx cap %p mds%d\n", inode
,
877 ceph_vinop(inode
), cap
, s
->s_mds
);
878 list_move_tail(&cap
->session_caps
, &s
->s_caps
);
880 doutc(cl
, "%p %llx.%llx cap %p mds%d NOP, iterating over caps\n",
881 inode
, ceph_vinop(inode
), cap
, s
->s_mds
);
883 spin_unlock(&s
->s_cap_lock
);
887 * Check if we hold the given mask. If so, move the cap(s) to the
888 * front of their respective LRUs. (This is the preferred way for
889 * callers to check for caps they want.)
891 int __ceph_caps_issued_mask(struct ceph_inode_info
*ci
, int mask
, int touch
)
893 struct inode
*inode
= &ci
->netfs
.inode
;
894 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
895 struct ceph_cap
*cap
;
897 int have
= ci
->i_snap_caps
;
899 if ((have
& mask
) == mask
) {
900 doutc(cl
, "mask %p %llx.%llx snap issued %s (mask %s)\n",
901 inode
, ceph_vinop(inode
), ceph_cap_string(have
),
902 ceph_cap_string(mask
));
906 for (p
= rb_first(&ci
->i_caps
); p
; p
= rb_next(p
)) {
907 cap
= rb_entry(p
, struct ceph_cap
, ci_node
);
908 if (!__cap_is_valid(cap
))
910 if ((cap
->issued
& mask
) == mask
) {
911 doutc(cl
, "mask %p %llx.%llx cap %p issued %s (mask %s)\n",
912 inode
, ceph_vinop(inode
), cap
,
913 ceph_cap_string(cap
->issued
),
914 ceph_cap_string(mask
));
920 /* does a combination of caps satisfy mask? */
922 if ((have
& mask
) == mask
) {
923 doutc(cl
, "mask %p %llx.%llx combo issued %s (mask %s)\n",
924 inode
, ceph_vinop(inode
),
925 ceph_cap_string(cap
->issued
),
926 ceph_cap_string(mask
));
930 /* touch this + preceding caps */
932 for (q
= rb_first(&ci
->i_caps
); q
!= p
;
934 cap
= rb_entry(q
, struct ceph_cap
,
936 if (!__cap_is_valid(cap
))
938 if (cap
->issued
& mask
)
949 int __ceph_caps_issued_mask_metric(struct ceph_inode_info
*ci
, int mask
,
952 struct ceph_fs_client
*fsc
= ceph_sb_to_fs_client(ci
->netfs
.inode
.i_sb
);
955 r
= __ceph_caps_issued_mask(ci
, mask
, touch
);
957 ceph_update_cap_hit(&fsc
->mdsc
->metric
);
959 ceph_update_cap_mis(&fsc
->mdsc
->metric
);
964 * Return true if mask caps are currently being revoked by an MDS.
966 int __ceph_caps_revoking_other(struct ceph_inode_info
*ci
,
967 struct ceph_cap
*ocap
, int mask
)
969 struct ceph_cap
*cap
;
972 for (p
= rb_first(&ci
->i_caps
); p
; p
= rb_next(p
)) {
973 cap
= rb_entry(p
, struct ceph_cap
, ci_node
);
975 (cap
->implemented
& ~cap
->issued
& mask
))
981 int ceph_caps_revoking(struct ceph_inode_info
*ci
, int mask
)
983 struct inode
*inode
= &ci
->netfs
.inode
;
984 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
987 spin_lock(&ci
->i_ceph_lock
);
988 ret
= __ceph_caps_revoking_other(ci
, NULL
, mask
);
989 spin_unlock(&ci
->i_ceph_lock
);
990 doutc(cl
, "%p %llx.%llx %s = %d\n", inode
, ceph_vinop(inode
),
991 ceph_cap_string(mask
), ret
);
995 int __ceph_caps_used(struct ceph_inode_info
*ci
)
999 used
|= CEPH_CAP_PIN
;
1001 used
|= CEPH_CAP_FILE_RD
;
1002 if (ci
->i_rdcache_ref
||
1003 (S_ISREG(ci
->netfs
.inode
.i_mode
) &&
1004 ci
->netfs
.inode
.i_data
.nrpages
))
1005 used
|= CEPH_CAP_FILE_CACHE
;
1007 used
|= CEPH_CAP_FILE_WR
;
1008 if (ci
->i_wb_ref
|| ci
->i_wrbuffer_ref
)
1009 used
|= CEPH_CAP_FILE_BUFFER
;
1011 used
|= CEPH_CAP_FILE_EXCL
;
1015 #define FMODE_WAIT_BIAS 1000
1018 * wanted, by virtue of open file modes
1020 int __ceph_caps_file_wanted(struct ceph_inode_info
*ci
)
1022 const int PIN_SHIFT
= ffs(CEPH_FILE_MODE_PIN
);
1023 const int RD_SHIFT
= ffs(CEPH_FILE_MODE_RD
);
1024 const int WR_SHIFT
= ffs(CEPH_FILE_MODE_WR
);
1025 const int LAZY_SHIFT
= ffs(CEPH_FILE_MODE_LAZY
);
1026 struct ceph_mount_options
*opt
=
1027 ceph_inode_to_fs_client(&ci
->netfs
.inode
)->mount_options
;
1028 unsigned long used_cutoff
= jiffies
- opt
->caps_wanted_delay_max
* HZ
;
1029 unsigned long idle_cutoff
= jiffies
- opt
->caps_wanted_delay_min
* HZ
;
1031 if (S_ISDIR(ci
->netfs
.inode
.i_mode
)) {
1034 /* use used_cutoff here, to keep dir's wanted caps longer */
1035 if (ci
->i_nr_by_mode
[RD_SHIFT
] > 0 ||
1036 time_after(ci
->i_last_rd
, used_cutoff
))
1037 want
|= CEPH_CAP_ANY_SHARED
;
1039 if (ci
->i_nr_by_mode
[WR_SHIFT
] > 0 ||
1040 time_after(ci
->i_last_wr
, used_cutoff
)) {
1041 want
|= CEPH_CAP_ANY_SHARED
| CEPH_CAP_FILE_EXCL
;
1042 if (opt
->flags
& CEPH_MOUNT_OPT_ASYNC_DIROPS
)
1043 want
|= CEPH_CAP_ANY_DIR_OPS
;
1046 if (want
|| ci
->i_nr_by_mode
[PIN_SHIFT
] > 0)
1047 want
|= CEPH_CAP_PIN
;
1053 if (ci
->i_nr_by_mode
[RD_SHIFT
] > 0) {
1054 if (ci
->i_nr_by_mode
[RD_SHIFT
] >= FMODE_WAIT_BIAS
||
1055 time_after(ci
->i_last_rd
, used_cutoff
))
1056 bits
|= 1 << RD_SHIFT
;
1057 } else if (time_after(ci
->i_last_rd
, idle_cutoff
)) {
1058 bits
|= 1 << RD_SHIFT
;
1061 if (ci
->i_nr_by_mode
[WR_SHIFT
] > 0) {
1062 if (ci
->i_nr_by_mode
[WR_SHIFT
] >= FMODE_WAIT_BIAS
||
1063 time_after(ci
->i_last_wr
, used_cutoff
))
1064 bits
|= 1 << WR_SHIFT
;
1065 } else if (time_after(ci
->i_last_wr
, idle_cutoff
)) {
1066 bits
|= 1 << WR_SHIFT
;
1069 /* check lazyio only when read/write is wanted */
1070 if ((bits
& (CEPH_FILE_MODE_RDWR
<< 1)) &&
1071 ci
->i_nr_by_mode
[LAZY_SHIFT
] > 0)
1072 bits
|= 1 << LAZY_SHIFT
;
1074 return bits
? ceph_caps_for_mode(bits
>> 1) : 0;
1079 * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
1081 int __ceph_caps_wanted(struct ceph_inode_info
*ci
)
1083 int w
= __ceph_caps_file_wanted(ci
) | __ceph_caps_used(ci
);
1084 if (S_ISDIR(ci
->netfs
.inode
.i_mode
)) {
1085 /* we want EXCL if holding caps of dir ops */
1086 if (w
& CEPH_CAP_ANY_DIR_OPS
)
1087 w
|= CEPH_CAP_FILE_EXCL
;
1089 /* we want EXCL if dirty data */
1090 if (w
& CEPH_CAP_FILE_BUFFER
)
1091 w
|= CEPH_CAP_FILE_EXCL
;
1097 * Return caps we have registered with the MDS(s) as 'wanted'.
1099 int __ceph_caps_mds_wanted(struct ceph_inode_info
*ci
, bool check
)
1101 struct ceph_cap
*cap
;
1105 for (p
= rb_first(&ci
->i_caps
); p
; p
= rb_next(p
)) {
1106 cap
= rb_entry(p
, struct ceph_cap
, ci_node
);
1107 if (check
&& !__cap_is_valid(cap
))
1109 if (cap
== ci
->i_auth_cap
)
1110 mds_wanted
|= cap
->mds_wanted
;
1112 mds_wanted
|= (cap
->mds_wanted
& ~CEPH_CAP_ANY_FILE_WR
);
1117 int ceph_is_any_caps(struct inode
*inode
)
1119 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1122 spin_lock(&ci
->i_ceph_lock
);
1123 ret
= __ceph_is_any_real_caps(ci
);
1124 spin_unlock(&ci
->i_ceph_lock
);
1130 * Remove a cap. Take steps to deal with a racing iterate_session_caps.
1132 * caller should hold i_ceph_lock.
1133 * caller will not hold session s_mutex if called from destroy_inode.
1135 void __ceph_remove_cap(struct ceph_cap
*cap
, bool queue_release
)
1137 struct ceph_mds_session
*session
= cap
->session
;
1138 struct ceph_client
*cl
= session
->s_mdsc
->fsc
->client
;
1139 struct ceph_inode_info
*ci
= cap
->ci
;
1140 struct inode
*inode
= &ci
->netfs
.inode
;
1141 struct ceph_mds_client
*mdsc
;
1144 /* 'ci' being NULL means the remove have already occurred */
1146 doutc(cl
, "inode is NULL\n");
1150 lockdep_assert_held(&ci
->i_ceph_lock
);
1152 doutc(cl
, "%p from %p %llx.%llx\n", cap
, inode
, ceph_vinop(inode
));
1154 mdsc
= ceph_inode_to_fs_client(&ci
->netfs
.inode
)->mdsc
;
1156 /* remove from inode's cap rbtree, and clear auth cap */
1157 rb_erase(&cap
->ci_node
, &ci
->i_caps
);
1158 if (ci
->i_auth_cap
== cap
)
1159 ci
->i_auth_cap
= NULL
;
1161 /* remove from session list */
1162 spin_lock(&session
->s_cap_lock
);
1163 if (session
->s_cap_iterator
== cap
) {
1164 /* not yet, we are iterating over this very cap */
1165 doutc(cl
, "delaying %p removal from session %p\n", cap
,
1168 list_del_init(&cap
->session_caps
);
1169 session
->s_nr_caps
--;
1170 atomic64_dec(&mdsc
->metric
.total_caps
);
1171 cap
->session
= NULL
;
1174 /* protect backpointer with s_cap_lock: see iterate_session_caps */
1178 * s_cap_reconnect is protected by s_cap_lock. no one changes
1179 * s_cap_gen while session is in the reconnect state.
1181 if (queue_release
&&
1182 (!session
->s_cap_reconnect
||
1183 cap
->cap_gen
== atomic_read(&session
->s_cap_gen
))) {
1184 cap
->queue_release
= 1;
1186 __ceph_queue_cap_release(session
, cap
);
1190 cap
->queue_release
= 0;
1192 cap
->cap_ino
= ci
->i_vino
.ino
;
1194 spin_unlock(&session
->s_cap_lock
);
1197 ceph_put_cap(mdsc
, cap
);
1199 if (!__ceph_is_any_real_caps(ci
)) {
1200 /* when reconnect denied, we remove session caps forcibly,
1201 * i_wr_ref can be non-zero. If there are ongoing write,
1202 * keep i_snap_realm.
1204 if (ci
->i_wr_ref
== 0 && ci
->i_snap_realm
)
1205 ceph_change_snap_realm(&ci
->netfs
.inode
, NULL
);
1207 __cap_delay_cancel(mdsc
, ci
);
1211 void ceph_remove_cap(struct ceph_mds_client
*mdsc
, struct ceph_cap
*cap
,
1214 struct ceph_inode_info
*ci
= cap
->ci
;
1215 struct ceph_fs_client
*fsc
;
1217 /* 'ci' being NULL means the remove have already occurred */
1219 doutc(mdsc
->fsc
->client
, "inode is NULL\n");
1223 lockdep_assert_held(&ci
->i_ceph_lock
);
1225 fsc
= ceph_inode_to_fs_client(&ci
->netfs
.inode
);
1226 WARN_ON_ONCE(ci
->i_auth_cap
== cap
&&
1227 !list_empty(&ci
->i_dirty_item
) &&
1228 !fsc
->blocklisted
&&
1229 !ceph_inode_is_shutdown(&ci
->netfs
.inode
));
1231 __ceph_remove_cap(cap
, queue_release
);
1234 struct cap_msg_args
{
1235 struct ceph_mds_session
*session
;
1236 u64 ino
, cid
, follows
;
1237 u64 flush_tid
, oldest_flush_tid
, size
, max_size
;
1240 struct ceph_buffer
*xattr_buf
;
1241 struct ceph_buffer
*old_xattr_buf
;
1242 struct timespec64 atime
, mtime
, ctime
, btime
;
1243 int op
, caps
, wanted
, dirty
;
1244 u32 seq
, issue_seq
, mseq
, time_warp_seq
;
1252 u32 fscrypt_auth_len
;
1253 u8 fscrypt_auth
[sizeof(struct ceph_fscrypt_auth
)]; // for context
1256 /* Marshal up the cap msg to the MDS */
1257 static void encode_cap_msg(struct ceph_msg
*msg
, struct cap_msg_args
*arg
)
1259 struct ceph_mds_caps
*fc
;
1261 struct ceph_mds_client
*mdsc
= arg
->session
->s_mdsc
;
1262 struct ceph_osd_client
*osdc
= &mdsc
->fsc
->client
->osdc
;
1264 doutc(mdsc
->fsc
->client
,
1265 "%s %llx %llx caps %s wanted %s dirty %s seq %u/%u"
1266 " tid %llu/%llu mseq %u follows %lld size %llu/%llu"
1267 " xattr_ver %llu xattr_len %d\n",
1268 ceph_cap_op_name(arg
->op
), arg
->cid
, arg
->ino
,
1269 ceph_cap_string(arg
->caps
), ceph_cap_string(arg
->wanted
),
1270 ceph_cap_string(arg
->dirty
), arg
->seq
, arg
->issue_seq
,
1271 arg
->flush_tid
, arg
->oldest_flush_tid
, arg
->mseq
, arg
->follows
,
1272 arg
->size
, arg
->max_size
, arg
->xattr_version
,
1273 arg
->xattr_buf
? (int)arg
->xattr_buf
->vec
.iov_len
: 0);
1275 msg
->hdr
.version
= cpu_to_le16(12);
1276 msg
->hdr
.tid
= cpu_to_le64(arg
->flush_tid
);
1278 fc
= msg
->front
.iov_base
;
1279 memset(fc
, 0, sizeof(*fc
));
1281 fc
->cap_id
= cpu_to_le64(arg
->cid
);
1282 fc
->op
= cpu_to_le32(arg
->op
);
1283 fc
->seq
= cpu_to_le32(arg
->seq
);
1284 fc
->issue_seq
= cpu_to_le32(arg
->issue_seq
);
1285 fc
->migrate_seq
= cpu_to_le32(arg
->mseq
);
1286 fc
->caps
= cpu_to_le32(arg
->caps
);
1287 fc
->wanted
= cpu_to_le32(arg
->wanted
);
1288 fc
->dirty
= cpu_to_le32(arg
->dirty
);
1289 fc
->ino
= cpu_to_le64(arg
->ino
);
1290 fc
->snap_follows
= cpu_to_le64(arg
->follows
);
1292 #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
1294 fc
->size
= cpu_to_le64(round_up(arg
->size
,
1295 CEPH_FSCRYPT_BLOCK_SIZE
));
1298 fc
->size
= cpu_to_le64(arg
->size
);
1299 fc
->max_size
= cpu_to_le64(arg
->max_size
);
1300 ceph_encode_timespec64(&fc
->mtime
, &arg
->mtime
);
1301 ceph_encode_timespec64(&fc
->atime
, &arg
->atime
);
1302 ceph_encode_timespec64(&fc
->ctime
, &arg
->ctime
);
1303 fc
->time_warp_seq
= cpu_to_le32(arg
->time_warp_seq
);
1305 fc
->uid
= cpu_to_le32(from_kuid(&init_user_ns
, arg
->uid
));
1306 fc
->gid
= cpu_to_le32(from_kgid(&init_user_ns
, arg
->gid
));
1307 fc
->mode
= cpu_to_le32(arg
->mode
);
1309 fc
->xattr_version
= cpu_to_le64(arg
->xattr_version
);
1310 if (arg
->xattr_buf
) {
1311 msg
->middle
= ceph_buffer_get(arg
->xattr_buf
);
1312 fc
->xattr_len
= cpu_to_le32(arg
->xattr_buf
->vec
.iov_len
);
1313 msg
->hdr
.middle_len
= cpu_to_le32(arg
->xattr_buf
->vec
.iov_len
);
1317 /* flock buffer size (version 2) */
1318 ceph_encode_32(&p
, 0);
1319 /* inline version (version 4) */
1320 ceph_encode_64(&p
, arg
->inline_data
? 0 : CEPH_INLINE_NONE
);
1321 /* inline data size */
1322 ceph_encode_32(&p
, 0);
1324 * osd_epoch_barrier (version 5)
1325 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in
1326 * case it was recently changed
1328 ceph_encode_32(&p
, READ_ONCE(osdc
->epoch_barrier
));
1329 /* oldest_flush_tid (version 6) */
1330 ceph_encode_64(&p
, arg
->oldest_flush_tid
);
1333 * caller_uid/caller_gid (version 7)
1335 * Currently, we don't properly track which caller dirtied the caps
1336 * last, and force a flush of them when there is a conflict. For now,
1337 * just set this to 0:0, to emulate how the MDS has worked up to now.
1339 ceph_encode_32(&p
, 0);
1340 ceph_encode_32(&p
, 0);
1342 /* pool namespace (version 8) (mds always ignores this) */
1343 ceph_encode_32(&p
, 0);
1345 /* btime and change_attr (version 9) */
1346 ceph_encode_timespec64(p
, &arg
->btime
);
1347 p
+= sizeof(struct ceph_timespec
);
1348 ceph_encode_64(&p
, arg
->change_attr
);
1350 /* Advisory flags (version 10) */
1351 ceph_encode_32(&p
, arg
->flags
);
1353 /* dirstats (version 11) - these are r/o on the client */
1354 ceph_encode_64(&p
, 0);
1355 ceph_encode_64(&p
, 0);
1357 #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
1359 * fscrypt_auth and fscrypt_file (version 12)
1361 * fscrypt_auth holds the crypto context (if any). fscrypt_file
1362 * tracks the real i_size as an __le64 field (and we use a rounded-up
1363 * i_size in the traditional size field).
1365 ceph_encode_32(&p
, arg
->fscrypt_auth_len
);
1366 ceph_encode_copy(&p
, arg
->fscrypt_auth
, arg
->fscrypt_auth_len
);
1367 ceph_encode_32(&p
, sizeof(__le64
));
1368 ceph_encode_64(&p
, arg
->size
);
1369 #else /* CONFIG_FS_ENCRYPTION */
1370 ceph_encode_32(&p
, 0);
1371 ceph_encode_32(&p
, 0);
1372 #endif /* CONFIG_FS_ENCRYPTION */
1376 * Queue cap releases when an inode is dropped from our cache.
1378 void __ceph_remove_caps(struct ceph_inode_info
*ci
)
1380 struct inode
*inode
= &ci
->netfs
.inode
;
1381 struct ceph_mds_client
*mdsc
= ceph_inode_to_fs_client(inode
)->mdsc
;
1384 /* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
1385 * may call __ceph_caps_issued_mask() on a freeing inode. */
1386 spin_lock(&ci
->i_ceph_lock
);
1387 p
= rb_first(&ci
->i_caps
);
1389 struct ceph_cap
*cap
= rb_entry(p
, struct ceph_cap
, ci_node
);
1391 ceph_remove_cap(mdsc
, cap
, true);
1393 spin_unlock(&ci
->i_ceph_lock
);
1397 * Prepare to send a cap message to an MDS. Update the cap state, and populate
1398 * the arg struct with the parameters that will need to be sent. This should
1399 * be done under the i_ceph_lock to guard against changes to cap state.
1401 * Make note of max_size reported/requested from mds, revoked caps
1402 * that have now been implemented.
1404 static void __prep_cap(struct cap_msg_args
*arg
, struct ceph_cap
*cap
,
1405 int op
, int flags
, int used
, int want
, int retain
,
1406 int flushing
, u64 flush_tid
, u64 oldest_flush_tid
)
1408 struct ceph_inode_info
*ci
= cap
->ci
;
1409 struct inode
*inode
= &ci
->netfs
.inode
;
1410 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1413 lockdep_assert_held(&ci
->i_ceph_lock
);
1415 held
= cap
->issued
| cap
->implemented
;
1416 revoking
= cap
->implemented
& ~cap
->issued
;
1417 retain
&= ~revoking
;
1419 doutc(cl
, "%p %llx.%llx cap %p session %p %s -> %s (revoking %s)\n",
1420 inode
, ceph_vinop(inode
), cap
, cap
->session
,
1421 ceph_cap_string(held
), ceph_cap_string(held
& retain
),
1422 ceph_cap_string(revoking
));
1423 BUG_ON((retain
& CEPH_CAP_PIN
) == 0);
1425 ci
->i_ceph_flags
&= ~CEPH_I_FLUSH
;
1427 cap
->issued
&= retain
; /* drop bits we don't want */
1429 * Wake up any waiters on wanted -> needed transition. This is due to
1430 * the weird transition from buffered to sync IO... we need to flush
1431 * dirty pages _before_ allowing sync writes to avoid reordering.
1433 arg
->wake
= cap
->implemented
& ~cap
->issued
;
1434 cap
->implemented
&= cap
->issued
| used
;
1435 cap
->mds_wanted
= want
;
1437 arg
->session
= cap
->session
;
1438 arg
->ino
= ceph_vino(inode
).ino
;
1439 arg
->cid
= cap
->cap_id
;
1440 arg
->follows
= flushing
? ci
->i_head_snapc
->seq
: 0;
1441 arg
->flush_tid
= flush_tid
;
1442 arg
->oldest_flush_tid
= oldest_flush_tid
;
1443 arg
->size
= i_size_read(inode
);
1444 ci
->i_reported_size
= arg
->size
;
1445 arg
->max_size
= ci
->i_wanted_max_size
;
1446 if (cap
== ci
->i_auth_cap
) {
1447 if (want
& CEPH_CAP_ANY_FILE_WR
)
1448 ci
->i_requested_max_size
= arg
->max_size
;
1450 ci
->i_requested_max_size
= 0;
1453 if (flushing
& CEPH_CAP_XATTR_EXCL
) {
1454 arg
->old_xattr_buf
= __ceph_build_xattrs_blob(ci
);
1455 arg
->xattr_version
= ci
->i_xattrs
.version
;
1456 arg
->xattr_buf
= ceph_buffer_get(ci
->i_xattrs
.blob
);
1458 arg
->xattr_buf
= NULL
;
1459 arg
->old_xattr_buf
= NULL
;
1462 arg
->mtime
= inode_get_mtime(inode
);
1463 arg
->atime
= inode_get_atime(inode
);
1464 arg
->ctime
= inode_get_ctime(inode
);
1465 arg
->btime
= ci
->i_btime
;
1466 arg
->change_attr
= inode_peek_iversion_raw(inode
);
1469 arg
->caps
= cap
->implemented
;
1471 arg
->dirty
= flushing
;
1473 arg
->seq
= cap
->seq
;
1474 arg
->issue_seq
= cap
->issue_seq
;
1475 arg
->mseq
= cap
->mseq
;
1476 arg
->time_warp_seq
= ci
->i_time_warp_seq
;
1478 arg
->uid
= inode
->i_uid
;
1479 arg
->gid
= inode
->i_gid
;
1480 arg
->mode
= inode
->i_mode
;
1482 arg
->inline_data
= ci
->i_inline_version
!= CEPH_INLINE_NONE
;
1483 if (!(flags
& CEPH_CLIENT_CAPS_PENDING_CAPSNAP
) &&
1484 !list_empty(&ci
->i_cap_snaps
)) {
1485 struct ceph_cap_snap
*capsnap
;
1486 list_for_each_entry_reverse(capsnap
, &ci
->i_cap_snaps
, ci_item
) {
1487 if (capsnap
->cap_flush
.tid
)
1489 if (capsnap
->need_flush
) {
1490 flags
|= CEPH_CLIENT_CAPS_PENDING_CAPSNAP
;
1496 arg
->encrypted
= IS_ENCRYPTED(inode
);
1497 #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
1498 if (ci
->fscrypt_auth_len
&&
1499 WARN_ON_ONCE(ci
->fscrypt_auth_len
> sizeof(struct ceph_fscrypt_auth
))) {
1500 /* Don't set this if it's too big */
1501 arg
->fscrypt_auth_len
= 0;
1503 arg
->fscrypt_auth_len
= ci
->fscrypt_auth_len
;
1504 memcpy(arg
->fscrypt_auth
, ci
->fscrypt_auth
,
1505 min_t(size_t, ci
->fscrypt_auth_len
,
1506 sizeof(arg
->fscrypt_auth
)));
1508 #endif /* CONFIG_FS_ENCRYPTION */
1511 #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
1512 #define CAP_MSG_FIXED_FIELDS (sizeof(struct ceph_mds_caps) + \
1513 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 8 + 4 + 4 + 8)
1515 static inline int cap_msg_size(struct cap_msg_args
*arg
)
1517 return CAP_MSG_FIXED_FIELDS
+ arg
->fscrypt_auth_len
;
1520 #define CAP_MSG_FIXED_FIELDS (sizeof(struct ceph_mds_caps) + \
1521 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 8 + 4 + 4)
1523 static inline int cap_msg_size(struct cap_msg_args
*arg
)
1525 return CAP_MSG_FIXED_FIELDS
;
1527 #endif /* CONFIG_FS_ENCRYPTION */
1530 * Send a cap msg on the given inode.
1532 * Caller should hold snap_rwsem (read), s_mutex.
1534 static void __send_cap(struct cap_msg_args
*arg
, struct ceph_inode_info
*ci
)
1536 struct ceph_msg
*msg
;
1537 struct inode
*inode
= &ci
->netfs
.inode
;
1538 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1540 msg
= ceph_msg_new(CEPH_MSG_CLIENT_CAPS
, cap_msg_size(arg
), GFP_NOFS
,
1544 "error allocating cap msg: ino (%llx.%llx)"
1545 " flushing %s tid %llu, requeuing cap.\n",
1546 ceph_vinop(inode
), ceph_cap_string(arg
->dirty
),
1548 spin_lock(&ci
->i_ceph_lock
);
1549 __cap_delay_requeue(arg
->session
->s_mdsc
, ci
);
1550 spin_unlock(&ci
->i_ceph_lock
);
1554 encode_cap_msg(msg
, arg
);
1555 ceph_con_send(&arg
->session
->s_con
, msg
);
1556 ceph_buffer_put(arg
->old_xattr_buf
);
1557 ceph_buffer_put(arg
->xattr_buf
);
1559 wake_up_all(&ci
->i_cap_wq
);
1562 static inline int __send_flush_snap(struct inode
*inode
,
1563 struct ceph_mds_session
*session
,
1564 struct ceph_cap_snap
*capsnap
,
1565 u32 mseq
, u64 oldest_flush_tid
)
1567 struct cap_msg_args arg
;
1568 struct ceph_msg
*msg
;
1570 arg
.session
= session
;
1571 arg
.ino
= ceph_vino(inode
).ino
;
1573 arg
.follows
= capsnap
->follows
;
1574 arg
.flush_tid
= capsnap
->cap_flush
.tid
;
1575 arg
.oldest_flush_tid
= oldest_flush_tid
;
1577 arg
.size
= capsnap
->size
;
1579 arg
.xattr_version
= capsnap
->xattr_version
;
1580 arg
.xattr_buf
= capsnap
->xattr_blob
;
1581 arg
.old_xattr_buf
= NULL
;
1583 arg
.atime
= capsnap
->atime
;
1584 arg
.mtime
= capsnap
->mtime
;
1585 arg
.ctime
= capsnap
->ctime
;
1586 arg
.btime
= capsnap
->btime
;
1587 arg
.change_attr
= capsnap
->change_attr
;
1589 arg
.op
= CEPH_CAP_OP_FLUSHSNAP
;
1590 arg
.caps
= capsnap
->issued
;
1592 arg
.dirty
= capsnap
->dirty
;
1597 arg
.time_warp_seq
= capsnap
->time_warp_seq
;
1599 arg
.uid
= capsnap
->uid
;
1600 arg
.gid
= capsnap
->gid
;
1601 arg
.mode
= capsnap
->mode
;
1603 arg
.inline_data
= capsnap
->inline_data
;
1606 arg
.encrypted
= IS_ENCRYPTED(inode
);
1608 /* No fscrypt_auth changes from a capsnap.*/
1609 arg
.fscrypt_auth_len
= 0;
1611 msg
= ceph_msg_new(CEPH_MSG_CLIENT_CAPS
, cap_msg_size(&arg
),
1616 encode_cap_msg(msg
, &arg
);
1617 ceph_con_send(&arg
.session
->s_con
, msg
);
1622 * When a snapshot is taken, clients accumulate dirty metadata on
1623 * inodes with capabilities in ceph_cap_snaps to describe the file
1624 * state at the time the snapshot was taken. This must be flushed
1625 * asynchronously back to the MDS once sync writes complete and dirty
1626 * data is written out.
1628 * Called under i_ceph_lock.
1630 static void __ceph_flush_snaps(struct ceph_inode_info
*ci
,
1631 struct ceph_mds_session
*session
)
1632 __releases(ci
->i_ceph_lock
)
1633 __acquires(ci
->i_ceph_lock
)
1635 struct inode
*inode
= &ci
->netfs
.inode
;
1636 struct ceph_mds_client
*mdsc
= session
->s_mdsc
;
1637 struct ceph_client
*cl
= mdsc
->fsc
->client
;
1638 struct ceph_cap_snap
*capsnap
;
1639 u64 oldest_flush_tid
= 0;
1640 u64 first_tid
= 1, last_tid
= 0;
1642 doutc(cl
, "%p %llx.%llx session %p\n", inode
, ceph_vinop(inode
),
1645 list_for_each_entry(capsnap
, &ci
->i_cap_snaps
, ci_item
) {
1647 * we need to wait for sync writes to complete and for dirty
1648 * pages to be written out.
1650 if (capsnap
->dirty_pages
|| capsnap
->writing
)
1653 /* should be removed by ceph_try_drop_cap_snap() */
1654 BUG_ON(!capsnap
->need_flush
);
1656 /* only flush each capsnap once */
1657 if (capsnap
->cap_flush
.tid
> 0) {
1658 doutc(cl
, "already flushed %p, skipping\n", capsnap
);
1662 spin_lock(&mdsc
->cap_dirty_lock
);
1663 capsnap
->cap_flush
.tid
= ++mdsc
->last_cap_flush_tid
;
1664 list_add_tail(&capsnap
->cap_flush
.g_list
,
1665 &mdsc
->cap_flush_list
);
1666 if (oldest_flush_tid
== 0)
1667 oldest_flush_tid
= __get_oldest_flush_tid(mdsc
);
1668 if (list_empty(&ci
->i_flushing_item
)) {
1669 list_add_tail(&ci
->i_flushing_item
,
1670 &session
->s_cap_flushing
);
1672 spin_unlock(&mdsc
->cap_dirty_lock
);
1674 list_add_tail(&capsnap
->cap_flush
.i_list
,
1675 &ci
->i_cap_flush_list
);
1678 first_tid
= capsnap
->cap_flush
.tid
;
1679 last_tid
= capsnap
->cap_flush
.tid
;
1682 ci
->i_ceph_flags
&= ~CEPH_I_FLUSH_SNAPS
;
1684 while (first_tid
<= last_tid
) {
1685 struct ceph_cap
*cap
= ci
->i_auth_cap
;
1686 struct ceph_cap_flush
*cf
= NULL
, *iter
;
1689 if (!(cap
&& cap
->session
== session
)) {
1690 doutc(cl
, "%p %llx.%llx auth cap %p not mds%d, stop\n",
1691 inode
, ceph_vinop(inode
), cap
, session
->s_mds
);
1696 list_for_each_entry(iter
, &ci
->i_cap_flush_list
, i_list
) {
1697 if (iter
->tid
>= first_tid
) {
1706 first_tid
= cf
->tid
+ 1;
1708 capsnap
= container_of(cf
, struct ceph_cap_snap
, cap_flush
);
1709 refcount_inc(&capsnap
->nref
);
1710 spin_unlock(&ci
->i_ceph_lock
);
1712 doutc(cl
, "%p %llx.%llx capsnap %p tid %llu %s\n", inode
,
1713 ceph_vinop(inode
), capsnap
, cf
->tid
,
1714 ceph_cap_string(capsnap
->dirty
));
1716 ret
= __send_flush_snap(inode
, session
, capsnap
, cap
->mseq
,
1719 pr_err_client(cl
, "error sending cap flushsnap, "
1720 "ino (%llx.%llx) tid %llu follows %llu\n",
1721 ceph_vinop(inode
), cf
->tid
,
1725 ceph_put_cap_snap(capsnap
);
1726 spin_lock(&ci
->i_ceph_lock
);
1730 void ceph_flush_snaps(struct ceph_inode_info
*ci
,
1731 struct ceph_mds_session
**psession
)
1733 struct inode
*inode
= &ci
->netfs
.inode
;
1734 struct ceph_mds_client
*mdsc
= ceph_inode_to_fs_client(inode
)->mdsc
;
1735 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1736 struct ceph_mds_session
*session
= NULL
;
1737 bool need_put
= false;
1740 doutc(cl
, "%p %llx.%llx\n", inode
, ceph_vinop(inode
));
1742 session
= *psession
;
1744 spin_lock(&ci
->i_ceph_lock
);
1745 if (!(ci
->i_ceph_flags
& CEPH_I_FLUSH_SNAPS
)) {
1746 doutc(cl
, " no capsnap needs flush, doing nothing\n");
1749 if (!ci
->i_auth_cap
) {
1750 doutc(cl
, " no auth cap (migrating?), doing nothing\n");
1754 mds
= ci
->i_auth_cap
->session
->s_mds
;
1755 if (session
&& session
->s_mds
!= mds
) {
1756 doutc(cl
, " oops, wrong session %p mutex\n", session
);
1757 ceph_put_mds_session(session
);
1761 spin_unlock(&ci
->i_ceph_lock
);
1762 mutex_lock(&mdsc
->mutex
);
1763 session
= __ceph_lookup_mds_session(mdsc
, mds
);
1764 mutex_unlock(&mdsc
->mutex
);
1768 // make sure flushsnap messages are sent in proper order.
1769 if (ci
->i_ceph_flags
& CEPH_I_KICK_FLUSH
)
1770 __kick_flushing_caps(mdsc
, session
, ci
, 0);
1772 __ceph_flush_snaps(ci
, session
);
1774 spin_unlock(&ci
->i_ceph_lock
);
1777 *psession
= session
;
1779 ceph_put_mds_session(session
);
1780 /* we flushed them all; remove this inode from the queue */
1781 spin_lock(&mdsc
->snap_flush_lock
);
1782 if (!list_empty(&ci
->i_snap_flush_item
))
1784 list_del_init(&ci
->i_snap_flush_item
);
1785 spin_unlock(&mdsc
->snap_flush_lock
);
1792 * Mark caps dirty. If inode is newly dirty, return the dirty flags.
1793 * Caller is then responsible for calling __mark_inode_dirty with the
1794 * returned flags value.
1796 int __ceph_mark_dirty_caps(struct ceph_inode_info
*ci
, int mask
,
1797 struct ceph_cap_flush
**pcf
)
1799 struct ceph_mds_client
*mdsc
=
1800 ceph_sb_to_fs_client(ci
->netfs
.inode
.i_sb
)->mdsc
;
1801 struct inode
*inode
= &ci
->netfs
.inode
;
1802 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1803 int was
= ci
->i_dirty_caps
;
1806 lockdep_assert_held(&ci
->i_ceph_lock
);
1808 if (!ci
->i_auth_cap
) {
1809 pr_warn_client(cl
, "%p %llx.%llx mask %s, "
1810 "but no auth cap (session was closed?)\n",
1811 inode
, ceph_vinop(inode
),
1812 ceph_cap_string(mask
));
1816 doutc(cl
, "%p %llx.%llx %s dirty %s -> %s\n", inode
,
1817 ceph_vinop(inode
), ceph_cap_string(mask
),
1818 ceph_cap_string(was
), ceph_cap_string(was
| mask
));
1819 ci
->i_dirty_caps
|= mask
;
1821 struct ceph_mds_session
*session
= ci
->i_auth_cap
->session
;
1823 WARN_ON_ONCE(ci
->i_prealloc_cap_flush
);
1824 swap(ci
->i_prealloc_cap_flush
, *pcf
);
1826 if (!ci
->i_head_snapc
) {
1827 WARN_ON_ONCE(!rwsem_is_locked(&mdsc
->snap_rwsem
));
1828 ci
->i_head_snapc
= ceph_get_snap_context(
1829 ci
->i_snap_realm
->cached_context
);
1831 doutc(cl
, "%p %llx.%llx now dirty snapc %p auth cap %p\n",
1832 inode
, ceph_vinop(inode
), ci
->i_head_snapc
,
1834 BUG_ON(!list_empty(&ci
->i_dirty_item
));
1835 spin_lock(&mdsc
->cap_dirty_lock
);
1836 list_add(&ci
->i_dirty_item
, &session
->s_cap_dirty
);
1837 spin_unlock(&mdsc
->cap_dirty_lock
);
1838 if (ci
->i_flushing_caps
== 0) {
1840 dirty
|= I_DIRTY_SYNC
;
1843 WARN_ON_ONCE(!ci
->i_prealloc_cap_flush
);
1845 BUG_ON(list_empty(&ci
->i_dirty_item
));
1846 if (((was
| ci
->i_flushing_caps
) & CEPH_CAP_FILE_BUFFER
) &&
1847 (mask
& CEPH_CAP_FILE_BUFFER
))
1848 dirty
|= I_DIRTY_DATASYNC
;
1849 __cap_delay_requeue(mdsc
, ci
);
1853 struct ceph_cap_flush
*ceph_alloc_cap_flush(void)
1855 struct ceph_cap_flush
*cf
;
1857 cf
= kmem_cache_alloc(ceph_cap_flush_cachep
, GFP_KERNEL
);
1861 cf
->is_capsnap
= false;
1865 void ceph_free_cap_flush(struct ceph_cap_flush
*cf
)
1868 kmem_cache_free(ceph_cap_flush_cachep
, cf
);
1871 static u64
__get_oldest_flush_tid(struct ceph_mds_client
*mdsc
)
1873 if (!list_empty(&mdsc
->cap_flush_list
)) {
1874 struct ceph_cap_flush
*cf
=
1875 list_first_entry(&mdsc
->cap_flush_list
,
1876 struct ceph_cap_flush
, g_list
);
1883 * Remove cap_flush from the mdsc's or inode's flushing cap list.
1884 * Return true if caller needs to wake up flush waiters.
1886 static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client
*mdsc
,
1887 struct ceph_cap_flush
*cf
)
1889 struct ceph_cap_flush
*prev
;
1890 bool wake
= cf
->wake
;
1892 if (wake
&& cf
->g_list
.prev
!= &mdsc
->cap_flush_list
) {
1893 prev
= list_prev_entry(cf
, g_list
);
1897 list_del_init(&cf
->g_list
);
1901 static bool __detach_cap_flush_from_ci(struct ceph_inode_info
*ci
,
1902 struct ceph_cap_flush
*cf
)
1904 struct ceph_cap_flush
*prev
;
1905 bool wake
= cf
->wake
;
1907 if (wake
&& cf
->i_list
.prev
!= &ci
->i_cap_flush_list
) {
1908 prev
= list_prev_entry(cf
, i_list
);
1912 list_del_init(&cf
->i_list
);
1917 * Add dirty inode to the flushing list. Assigned a seq number so we
1918 * can wait for caps to flush without starving.
1920 * Called under i_ceph_lock. Returns the flush tid.
1922 static u64
__mark_caps_flushing(struct inode
*inode
,
1923 struct ceph_mds_session
*session
, bool wake
,
1924 u64
*oldest_flush_tid
)
1926 struct ceph_mds_client
*mdsc
= ceph_sb_to_fs_client(inode
->i_sb
)->mdsc
;
1927 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1928 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1929 struct ceph_cap_flush
*cf
= NULL
;
1932 lockdep_assert_held(&ci
->i_ceph_lock
);
1933 BUG_ON(ci
->i_dirty_caps
== 0);
1934 BUG_ON(list_empty(&ci
->i_dirty_item
));
1935 BUG_ON(!ci
->i_prealloc_cap_flush
);
1937 flushing
= ci
->i_dirty_caps
;
1938 doutc(cl
, "flushing %s, flushing_caps %s -> %s\n",
1939 ceph_cap_string(flushing
),
1940 ceph_cap_string(ci
->i_flushing_caps
),
1941 ceph_cap_string(ci
->i_flushing_caps
| flushing
));
1942 ci
->i_flushing_caps
|= flushing
;
1943 ci
->i_dirty_caps
= 0;
1944 doutc(cl
, "%p %llx.%llx now !dirty\n", inode
, ceph_vinop(inode
));
1946 swap(cf
, ci
->i_prealloc_cap_flush
);
1947 cf
->caps
= flushing
;
1950 spin_lock(&mdsc
->cap_dirty_lock
);
1951 list_del_init(&ci
->i_dirty_item
);
1953 cf
->tid
= ++mdsc
->last_cap_flush_tid
;
1954 list_add_tail(&cf
->g_list
, &mdsc
->cap_flush_list
);
1955 *oldest_flush_tid
= __get_oldest_flush_tid(mdsc
);
1957 if (list_empty(&ci
->i_flushing_item
)) {
1958 list_add_tail(&ci
->i_flushing_item
, &session
->s_cap_flushing
);
1959 mdsc
->num_cap_flushing
++;
1961 spin_unlock(&mdsc
->cap_dirty_lock
);
1963 list_add_tail(&cf
->i_list
, &ci
->i_cap_flush_list
);
1969 * try to invalidate mapping pages without blocking.
1971 static int try_nonblocking_invalidate(struct inode
*inode
)
1972 __releases(ci
->i_ceph_lock
)
1973 __acquires(ci
->i_ceph_lock
)
1975 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1976 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1977 u32 invalidating_gen
= ci
->i_rdcache_gen
;
1979 spin_unlock(&ci
->i_ceph_lock
);
1980 ceph_fscache_invalidate(inode
, false);
1981 invalidate_mapping_pages(&inode
->i_data
, 0, -1);
1982 spin_lock(&ci
->i_ceph_lock
);
1984 if (inode
->i_data
.nrpages
== 0 &&
1985 invalidating_gen
== ci
->i_rdcache_gen
) {
1987 doutc(cl
, "%p %llx.%llx success\n", inode
,
1989 /* save any racing async invalidate some trouble */
1990 ci
->i_rdcache_revoking
= ci
->i_rdcache_gen
- 1;
1993 doutc(cl
, "%p %llx.%llx failed\n", inode
, ceph_vinop(inode
));
1997 bool __ceph_should_report_size(struct ceph_inode_info
*ci
)
1999 loff_t size
= i_size_read(&ci
->netfs
.inode
);
2000 /* mds will adjust max size according to the reported size */
2001 if (ci
->i_flushing_caps
& CEPH_CAP_FILE_WR
)
2003 if (size
>= ci
->i_max_size
)
2005 /* half of previous max_size increment has been used */
2006 if (ci
->i_max_size
> ci
->i_reported_size
&&
2007 (size
<< 1) >= ci
->i_max_size
+ ci
->i_reported_size
)
2013 * Swiss army knife function to examine currently used and wanted
2014 * versus held caps. Release, flush, ack revoked caps to mds as
2017 * CHECK_CAPS_AUTHONLY - we should only check the auth cap
2018 * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
2020 * CHECK_CAPS_FLUSH_FORCE - we should flush any caps immediately, without
2023 void ceph_check_caps(struct ceph_inode_info
*ci
, int flags
)
2025 struct inode
*inode
= &ci
->netfs
.inode
;
2026 struct ceph_mds_client
*mdsc
= ceph_sb_to_mdsc(inode
->i_sb
);
2027 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
2028 struct ceph_cap
*cap
;
2029 u64 flush_tid
, oldest_flush_tid
;
2030 int file_wanted
, used
, cap_used
;
2031 int issued
, implemented
, want
, retain
, revoking
, flushing
= 0;
2032 int mds
= -1; /* keep track of how far we've gone through i_caps list
2033 to avoid an infinite loop on retry */
2035 bool queue_invalidate
= false;
2036 bool tried_invalidate
= false;
2037 bool queue_writeback
= false;
2038 struct ceph_mds_session
*session
= NULL
;
2040 spin_lock(&ci
->i_ceph_lock
);
2041 if (ci
->i_ceph_flags
& CEPH_I_ASYNC_CREATE
) {
2042 ci
->i_ceph_flags
|= CEPH_I_ASYNC_CHECK_CAPS
;
2044 /* Don't send messages until we get async create reply */
2045 spin_unlock(&ci
->i_ceph_lock
);
2049 if (ci
->i_ceph_flags
& CEPH_I_FLUSH
)
2050 flags
|= CHECK_CAPS_FLUSH
;
2052 /* Caps wanted by virtue of active open files. */
2053 file_wanted
= __ceph_caps_file_wanted(ci
);
2055 /* Caps which have active references against them */
2056 used
= __ceph_caps_used(ci
);
2059 * "issued" represents the current caps that the MDS wants us to have.
2060 * "implemented" is the set that we have been granted, and includes the
2061 * ones that have not yet been returned to the MDS (the "revoking" set,
2062 * usually because they have outstanding references).
2064 issued
= __ceph_caps_issued(ci
, &implemented
);
2065 revoking
= implemented
& ~issued
;
2069 /* The ones we currently want to retain (may be adjusted below) */
2070 retain
= file_wanted
| used
| CEPH_CAP_PIN
;
2071 if (!mdsc
->stopping
&& inode
->i_nlink
> 0) {
2073 retain
|= CEPH_CAP_ANY
; /* be greedy */
2074 } else if (S_ISDIR(inode
->i_mode
) &&
2075 (issued
& CEPH_CAP_FILE_SHARED
) &&
2076 __ceph_dir_is_complete(ci
)) {
2078 * If a directory is complete, we want to keep
2079 * the exclusive cap. So that MDS does not end up
2080 * revoking the shared cap on every create/unlink
2083 if (IS_RDONLY(inode
)) {
2084 want
= CEPH_CAP_ANY_SHARED
;
2086 want
|= CEPH_CAP_ANY_SHARED
| CEPH_CAP_FILE_EXCL
;
2091 retain
|= CEPH_CAP_ANY_SHARED
;
2093 * keep RD only if we didn't have the file open RW,
2094 * because then the mds would revoke it anyway to
2095 * journal max_size=0.
2097 if (ci
->i_max_size
== 0)
2098 retain
|= CEPH_CAP_ANY_RD
;
2102 doutc(cl
, "%p %llx.%llx file_want %s used %s dirty %s "
2103 "flushing %s issued %s revoking %s retain %s %s%s%s%s\n",
2104 inode
, ceph_vinop(inode
), ceph_cap_string(file_wanted
),
2105 ceph_cap_string(used
), ceph_cap_string(ci
->i_dirty_caps
),
2106 ceph_cap_string(ci
->i_flushing_caps
),
2107 ceph_cap_string(issued
), ceph_cap_string(revoking
),
2108 ceph_cap_string(retain
),
2109 (flags
& CHECK_CAPS_AUTHONLY
) ? " AUTHONLY" : "",
2110 (flags
& CHECK_CAPS_FLUSH
) ? " FLUSH" : "",
2111 (flags
& CHECK_CAPS_NOINVAL
) ? " NOINVAL" : "",
2112 (flags
& CHECK_CAPS_FLUSH_FORCE
) ? " FLUSH_FORCE" : "");
2115 * If we no longer need to hold onto old our caps, and we may
2116 * have cached pages, but don't want them, then try to invalidate.
2117 * If we fail, it's because pages are locked.... try again later.
2119 if ((!(flags
& CHECK_CAPS_NOINVAL
) || mdsc
->stopping
) &&
2120 S_ISREG(inode
->i_mode
) &&
2121 !(ci
->i_wb_ref
|| ci
->i_wrbuffer_ref
) && /* no dirty pages... */
2122 inode
->i_data
.nrpages
&& /* have cached pages */
2123 (revoking
& (CEPH_CAP_FILE_CACHE
|
2124 CEPH_CAP_FILE_LAZYIO
)) && /* or revoking cache */
2125 !tried_invalidate
) {
2126 doutc(cl
, "trying to invalidate on %p %llx.%llx\n",
2127 inode
, ceph_vinop(inode
));
2128 if (try_nonblocking_invalidate(inode
) < 0) {
2129 doutc(cl
, "queuing invalidate\n");
2130 queue_invalidate
= true;
2131 ci
->i_rdcache_revoking
= ci
->i_rdcache_gen
;
2133 tried_invalidate
= true;
2137 for (p
= rb_first(&ci
->i_caps
); p
; p
= rb_next(p
)) {
2139 struct cap_msg_args arg
;
2141 cap
= rb_entry(p
, struct ceph_cap
, ci_node
);
2143 /* avoid looping forever */
2144 if (mds
>= cap
->mds
||
2145 ((flags
& CHECK_CAPS_AUTHONLY
) && cap
!= ci
->i_auth_cap
))
2149 * If we have an auth cap, we don't need to consider any
2150 * overlapping caps as used.
2153 if (ci
->i_auth_cap
&& cap
!= ci
->i_auth_cap
)
2154 cap_used
&= ~ci
->i_auth_cap
->issued
;
2156 revoking
= cap
->implemented
& ~cap
->issued
;
2157 doutc(cl
, " mds%d cap %p used %s issued %s implemented %s revoking %s\n",
2158 cap
->mds
, cap
, ceph_cap_string(cap_used
),
2159 ceph_cap_string(cap
->issued
),
2160 ceph_cap_string(cap
->implemented
),
2161 ceph_cap_string(revoking
));
2163 /* completed revocation? going down and there are no caps? */
2165 if ((revoking
& cap_used
) == 0) {
2166 doutc(cl
, "completed revocation of %s\n",
2167 ceph_cap_string(cap
->implemented
& ~cap
->issued
));
2172 * If the "i_wrbuffer_ref" was increased by mmap or generic
2173 * cache write just before the ceph_check_caps() is called,
2174 * the Fb capability revoking will fail this time. Then we
2175 * must wait for the BDI's delayed work to flush the dirty
2176 * pages and to release the "i_wrbuffer_ref", which will cost
2177 * at most 5 seconds. That means the MDS needs to wait at
2178 * most 5 seconds to finished the Fb capability's revocation.
2180 * Let's queue a writeback for it.
2182 if (S_ISREG(inode
->i_mode
) && ci
->i_wrbuffer_ref
&&
2183 (revoking
& CEPH_CAP_FILE_BUFFER
))
2184 queue_writeback
= true;
2187 if (flags
& CHECK_CAPS_FLUSH_FORCE
) {
2188 doutc(cl
, "force to flush caps\n");
2192 if (cap
== ci
->i_auth_cap
&&
2193 (cap
->issued
& CEPH_CAP_FILE_WR
)) {
2194 /* request larger max_size from MDS? */
2195 if (ci
->i_wanted_max_size
> ci
->i_max_size
&&
2196 ci
->i_wanted_max_size
> ci
->i_requested_max_size
) {
2197 doutc(cl
, "requesting new max_size\n");
2201 /* approaching file_max? */
2202 if (__ceph_should_report_size(ci
)) {
2203 doutc(cl
, "i_size approaching max_size\n");
2207 /* flush anything dirty? */
2208 if (cap
== ci
->i_auth_cap
) {
2209 if ((flags
& CHECK_CAPS_FLUSH
) && ci
->i_dirty_caps
) {
2210 doutc(cl
, "flushing dirty caps\n");
2213 if (ci
->i_ceph_flags
& CEPH_I_FLUSH_SNAPS
) {
2214 doutc(cl
, "flushing snap caps\n");
2219 /* want more caps from mds? */
2220 if (want
& ~cap
->mds_wanted
) {
2221 if (want
& ~(cap
->mds_wanted
| cap
->issued
))
2223 if (!__cap_is_valid(cap
))
2227 /* things we might delay */
2228 if ((cap
->issued
& ~retain
) == 0)
2229 continue; /* nope, all good */
2232 ceph_put_mds_session(session
);
2233 session
= ceph_get_mds_session(cap
->session
);
2235 /* kick flushing and flush snaps before sending normal
2237 if (cap
== ci
->i_auth_cap
&&
2239 (CEPH_I_KICK_FLUSH
| CEPH_I_FLUSH_SNAPS
))) {
2240 if (ci
->i_ceph_flags
& CEPH_I_KICK_FLUSH
)
2241 __kick_flushing_caps(mdsc
, session
, ci
, 0);
2242 if (ci
->i_ceph_flags
& CEPH_I_FLUSH_SNAPS
)
2243 __ceph_flush_snaps(ci
, session
);
2248 if (cap
== ci
->i_auth_cap
&& ci
->i_dirty_caps
) {
2249 flushing
= ci
->i_dirty_caps
;
2250 flush_tid
= __mark_caps_flushing(inode
, session
, false,
2252 if (flags
& CHECK_CAPS_FLUSH
&&
2253 list_empty(&session
->s_cap_dirty
))
2254 mflags
|= CEPH_CLIENT_CAPS_SYNC
;
2258 spin_lock(&mdsc
->cap_dirty_lock
);
2259 oldest_flush_tid
= __get_oldest_flush_tid(mdsc
);
2260 spin_unlock(&mdsc
->cap_dirty_lock
);
2263 mds
= cap
->mds
; /* remember mds, so we don't repeat */
2265 __prep_cap(&arg
, cap
, CEPH_CAP_OP_UPDATE
, mflags
, cap_used
,
2266 want
, retain
, flushing
, flush_tid
, oldest_flush_tid
);
2268 spin_unlock(&ci
->i_ceph_lock
);
2269 __send_cap(&arg
, ci
);
2270 spin_lock(&ci
->i_ceph_lock
);
2272 goto retry
; /* retake i_ceph_lock and restart our cap scan. */
2275 /* periodically re-calculate caps wanted by open files */
2276 if (__ceph_is_any_real_caps(ci
) &&
2277 list_empty(&ci
->i_cap_delay_list
) &&
2278 (file_wanted
& ~CEPH_CAP_PIN
) &&
2279 !(used
& (CEPH_CAP_FILE_RD
| CEPH_CAP_ANY_FILE_WR
))) {
2280 __cap_delay_requeue(mdsc
, ci
);
2283 spin_unlock(&ci
->i_ceph_lock
);
2285 ceph_put_mds_session(session
);
2286 if (queue_writeback
)
2287 ceph_queue_writeback(inode
);
2288 if (queue_invalidate
)
2289 ceph_queue_invalidate(inode
);
2293 * Try to flush dirty caps back to the auth mds.
2295 static int try_flush_caps(struct inode
*inode
, u64
*ptid
)
2297 struct ceph_mds_client
*mdsc
= ceph_sb_to_fs_client(inode
->i_sb
)->mdsc
;
2298 struct ceph_inode_info
*ci
= ceph_inode(inode
);
2300 u64 flush_tid
= 0, oldest_flush_tid
= 0;
2302 spin_lock(&ci
->i_ceph_lock
);
2304 if (ci
->i_dirty_caps
&& ci
->i_auth_cap
) {
2305 struct ceph_cap
*cap
= ci
->i_auth_cap
;
2306 struct cap_msg_args arg
;
2307 struct ceph_mds_session
*session
= cap
->session
;
2309 if (session
->s_state
< CEPH_MDS_SESSION_OPEN
) {
2310 spin_unlock(&ci
->i_ceph_lock
);
2314 if (ci
->i_ceph_flags
&
2315 (CEPH_I_KICK_FLUSH
| CEPH_I_FLUSH_SNAPS
)) {
2316 if (ci
->i_ceph_flags
& CEPH_I_KICK_FLUSH
)
2317 __kick_flushing_caps(mdsc
, session
, ci
, 0);
2318 if (ci
->i_ceph_flags
& CEPH_I_FLUSH_SNAPS
)
2319 __ceph_flush_snaps(ci
, session
);
2323 flushing
= ci
->i_dirty_caps
;
2324 flush_tid
= __mark_caps_flushing(inode
, session
, true,
2327 __prep_cap(&arg
, cap
, CEPH_CAP_OP_FLUSH
, CEPH_CLIENT_CAPS_SYNC
,
2328 __ceph_caps_used(ci
), __ceph_caps_wanted(ci
),
2329 (cap
->issued
| cap
->implemented
),
2330 flushing
, flush_tid
, oldest_flush_tid
);
2331 spin_unlock(&ci
->i_ceph_lock
);
2333 __send_cap(&arg
, ci
);
2335 if (!list_empty(&ci
->i_cap_flush_list
)) {
2336 struct ceph_cap_flush
*cf
=
2337 list_last_entry(&ci
->i_cap_flush_list
,
2338 struct ceph_cap_flush
, i_list
);
2340 flush_tid
= cf
->tid
;
2342 flushing
= ci
->i_flushing_caps
;
2343 spin_unlock(&ci
->i_ceph_lock
);
2351 * Return true if we've flushed caps through the given flush_tid.
2353 static int caps_are_flushed(struct inode
*inode
, u64 flush_tid
)
2355 struct ceph_inode_info
*ci
= ceph_inode(inode
);
2358 spin_lock(&ci
->i_ceph_lock
);
2359 if (!list_empty(&ci
->i_cap_flush_list
)) {
2360 struct ceph_cap_flush
* cf
=
2361 list_first_entry(&ci
->i_cap_flush_list
,
2362 struct ceph_cap_flush
, i_list
);
2363 if (cf
->tid
<= flush_tid
)
2366 spin_unlock(&ci
->i_ceph_lock
);
2371 * flush the mdlog and wait for any unsafe requests to complete.
2373 static int flush_mdlog_and_wait_inode_unsafe_requests(struct inode
*inode
)
2375 struct ceph_mds_client
*mdsc
= ceph_sb_to_fs_client(inode
->i_sb
)->mdsc
;
2376 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
2377 struct ceph_inode_info
*ci
= ceph_inode(inode
);
2378 struct ceph_mds_request
*req1
= NULL
, *req2
= NULL
;
2381 spin_lock(&ci
->i_unsafe_lock
);
2382 if (S_ISDIR(inode
->i_mode
) && !list_empty(&ci
->i_unsafe_dirops
)) {
2383 req1
= list_last_entry(&ci
->i_unsafe_dirops
,
2384 struct ceph_mds_request
,
2386 ceph_mdsc_get_request(req1
);
2388 if (!list_empty(&ci
->i_unsafe_iops
)) {
2389 req2
= list_last_entry(&ci
->i_unsafe_iops
,
2390 struct ceph_mds_request
,
2391 r_unsafe_target_item
);
2392 ceph_mdsc_get_request(req2
);
2394 spin_unlock(&ci
->i_unsafe_lock
);
2397 * Trigger to flush the journal logs in all the relevant MDSes
2398 * manually, or in the worst case we must wait at most 5 seconds
2399 * to wait the journal logs to be flushed by the MDSes periodically.
2402 struct ceph_mds_request
*req
;
2403 struct ceph_mds_session
**sessions
;
2404 struct ceph_mds_session
*s
;
2405 unsigned int max_sessions
;
2408 mutex_lock(&mdsc
->mutex
);
2409 max_sessions
= mdsc
->max_sessions
;
2411 sessions
= kcalloc(max_sessions
, sizeof(s
), GFP_KERNEL
);
2413 mutex_unlock(&mdsc
->mutex
);
2418 spin_lock(&ci
->i_unsafe_lock
);
2420 list_for_each_entry(req
, &ci
->i_unsafe_dirops
,
2421 r_unsafe_dir_item
) {
2425 if (!sessions
[s
->s_mds
]) {
2426 s
= ceph_get_mds_session(s
);
2427 sessions
[s
->s_mds
] = s
;
2432 list_for_each_entry(req
, &ci
->i_unsafe_iops
,
2433 r_unsafe_target_item
) {
2437 if (!sessions
[s
->s_mds
]) {
2438 s
= ceph_get_mds_session(s
);
2439 sessions
[s
->s_mds
] = s
;
2443 spin_unlock(&ci
->i_unsafe_lock
);
2446 spin_lock(&ci
->i_ceph_lock
);
2447 if (ci
->i_auth_cap
) {
2448 s
= ci
->i_auth_cap
->session
;
2449 if (!sessions
[s
->s_mds
])
2450 sessions
[s
->s_mds
] = ceph_get_mds_session(s
);
2452 spin_unlock(&ci
->i_ceph_lock
);
2453 mutex_unlock(&mdsc
->mutex
);
2455 /* send flush mdlog request to MDSes */
2456 for (i
= 0; i
< max_sessions
; i
++) {
2459 send_flush_mdlog(s
);
2460 ceph_put_mds_session(s
);
2466 doutc(cl
, "%p %llx.%llx wait on tid %llu %llu\n", inode
,
2467 ceph_vinop(inode
), req1
? req1
->r_tid
: 0ULL,
2468 req2
? req2
->r_tid
: 0ULL);
2470 ret
= !wait_for_completion_timeout(&req1
->r_safe_completion
,
2471 ceph_timeout_jiffies(req1
->r_timeout
));
2476 ret
= !wait_for_completion_timeout(&req2
->r_safe_completion
,
2477 ceph_timeout_jiffies(req2
->r_timeout
));
2484 ceph_mdsc_put_request(req1
);
2486 ceph_mdsc_put_request(req2
);
2490 int ceph_fsync(struct file
*file
, loff_t start
, loff_t end
, int datasync
)
2492 struct inode
*inode
= file
->f_mapping
->host
;
2493 struct ceph_inode_info
*ci
= ceph_inode(inode
);
2494 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
2499 doutc(cl
, "%p %llx.%llx%s\n", inode
, ceph_vinop(inode
),
2500 datasync
? " datasync" : "");
2502 ret
= file_write_and_wait_range(file
, start
, end
);
2506 ret
= ceph_wait_on_async_create(inode
);
2510 dirty
= try_flush_caps(inode
, &flush_tid
);
2511 doutc(cl
, "dirty caps are %s\n", ceph_cap_string(dirty
));
2513 err
= flush_mdlog_and_wait_inode_unsafe_requests(inode
);
2516 * only wait on non-file metadata writeback (the mds
2517 * can recover size and mtime, so we don't need to
2520 if (!err
&& (dirty
& ~CEPH_CAP_ANY_FILE_WR
)) {
2521 err
= wait_event_interruptible(ci
->i_cap_wq
,
2522 caps_are_flushed(inode
, flush_tid
));
2528 err
= file_check_and_advance_wb_err(file
);
2532 doutc(cl
, "%p %llx.%llx%s result=%d\n", inode
, ceph_vinop(inode
),
2533 datasync
? " datasync" : "", ret
);
2538 * Flush any dirty caps back to the mds. If we aren't asked to wait,
2539 * queue inode for flush but don't do so immediately, because we can
2540 * get by with fewer MDS messages if we wait for data writeback to
2543 int ceph_write_inode(struct inode
*inode
, struct writeback_control
*wbc
)
2545 struct ceph_inode_info
*ci
= ceph_inode(inode
);
2546 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
2550 int wait
= (wbc
->sync_mode
== WB_SYNC_ALL
&& !wbc
->for_sync
);
2552 doutc(cl
, "%p %llx.%llx wait=%d\n", inode
, ceph_vinop(inode
), wait
);
2553 ceph_fscache_unpin_writeback(inode
, wbc
);
2555 err
= ceph_wait_on_async_create(inode
);
2558 dirty
= try_flush_caps(inode
, &flush_tid
);
2560 err
= wait_event_interruptible(ci
->i_cap_wq
,
2561 caps_are_flushed(inode
, flush_tid
));
2563 struct ceph_mds_client
*mdsc
=
2564 ceph_sb_to_fs_client(inode
->i_sb
)->mdsc
;
2566 spin_lock(&ci
->i_ceph_lock
);
2567 if (__ceph_caps_dirty(ci
))
2568 __cap_delay_requeue_front(mdsc
, ci
);
2569 spin_unlock(&ci
->i_ceph_lock
);
2574 static void __kick_flushing_caps(struct ceph_mds_client
*mdsc
,
2575 struct ceph_mds_session
*session
,
2576 struct ceph_inode_info
*ci
,
2577 u64 oldest_flush_tid
)
2578 __releases(ci
->i_ceph_lock
)
2579 __acquires(ci
->i_ceph_lock
)
2581 struct inode
*inode
= &ci
->netfs
.inode
;
2582 struct ceph_client
*cl
= mdsc
->fsc
->client
;
2583 struct ceph_cap
*cap
;
2584 struct ceph_cap_flush
*cf
;
2587 u64 last_snap_flush
= 0;
2589 /* Don't do anything until create reply comes in */
2590 if (ci
->i_ceph_flags
& CEPH_I_ASYNC_CREATE
)
2593 ci
->i_ceph_flags
&= ~CEPH_I_KICK_FLUSH
;
2595 list_for_each_entry_reverse(cf
, &ci
->i_cap_flush_list
, i_list
) {
2596 if (cf
->is_capsnap
) {
2597 last_snap_flush
= cf
->tid
;
2602 list_for_each_entry(cf
, &ci
->i_cap_flush_list
, i_list
) {
2603 if (cf
->tid
< first_tid
)
2606 cap
= ci
->i_auth_cap
;
2607 if (!(cap
&& cap
->session
== session
)) {
2608 pr_err_client(cl
, "%p auth cap %p not mds%d ???\n",
2609 inode
, cap
, session
->s_mds
);
2613 first_tid
= cf
->tid
+ 1;
2615 if (!cf
->is_capsnap
) {
2616 struct cap_msg_args arg
;
2618 doutc(cl
, "%p %llx.%llx cap %p tid %llu %s\n",
2619 inode
, ceph_vinop(inode
), cap
, cf
->tid
,
2620 ceph_cap_string(cf
->caps
));
2621 __prep_cap(&arg
, cap
, CEPH_CAP_OP_FLUSH
,
2622 (cf
->tid
< last_snap_flush
?
2623 CEPH_CLIENT_CAPS_PENDING_CAPSNAP
: 0),
2624 __ceph_caps_used(ci
),
2625 __ceph_caps_wanted(ci
),
2626 (cap
->issued
| cap
->implemented
),
2627 cf
->caps
, cf
->tid
, oldest_flush_tid
);
2628 spin_unlock(&ci
->i_ceph_lock
);
2629 __send_cap(&arg
, ci
);
2631 struct ceph_cap_snap
*capsnap
=
2632 container_of(cf
, struct ceph_cap_snap
,
2634 doutc(cl
, "%p %llx.%llx capsnap %p tid %llu %s\n",
2635 inode
, ceph_vinop(inode
), capsnap
, cf
->tid
,
2636 ceph_cap_string(capsnap
->dirty
));
2638 refcount_inc(&capsnap
->nref
);
2639 spin_unlock(&ci
->i_ceph_lock
);
2641 ret
= __send_flush_snap(inode
, session
, capsnap
, cap
->mseq
,
2644 pr_err_client(cl
, "error sending cap flushsnap,"
2645 " %p %llx.%llx tid %llu follows %llu\n",
2646 inode
, ceph_vinop(inode
), cf
->tid
,
2650 ceph_put_cap_snap(capsnap
);
2653 spin_lock(&ci
->i_ceph_lock
);
2657 void ceph_early_kick_flushing_caps(struct ceph_mds_client
*mdsc
,
2658 struct ceph_mds_session
*session
)
2660 struct ceph_client
*cl
= mdsc
->fsc
->client
;
2661 struct ceph_inode_info
*ci
;
2662 struct ceph_cap
*cap
;
2663 u64 oldest_flush_tid
;
2665 doutc(cl
, "mds%d\n", session
->s_mds
);
2667 spin_lock(&mdsc
->cap_dirty_lock
);
2668 oldest_flush_tid
= __get_oldest_flush_tid(mdsc
);
2669 spin_unlock(&mdsc
->cap_dirty_lock
);
2671 list_for_each_entry(ci
, &session
->s_cap_flushing
, i_flushing_item
) {
2672 struct inode
*inode
= &ci
->netfs
.inode
;
2674 spin_lock(&ci
->i_ceph_lock
);
2675 cap
= ci
->i_auth_cap
;
2676 if (!(cap
&& cap
->session
== session
)) {
2677 pr_err_client(cl
, "%p %llx.%llx auth cap %p not mds%d ???\n",
2678 inode
, ceph_vinop(inode
), cap
,
2680 spin_unlock(&ci
->i_ceph_lock
);
2686 * if flushing caps were revoked, we re-send the cap flush
2687 * in client reconnect stage. This guarantees MDS * processes
2688 * the cap flush message before issuing the flushing caps to
2691 if ((cap
->issued
& ci
->i_flushing_caps
) !=
2692 ci
->i_flushing_caps
) {
2693 /* encode_caps_cb() also will reset these sequence
2694 * numbers. make sure sequence numbers in cap flush
2695 * message match later reconnect message */
2699 __kick_flushing_caps(mdsc
, session
, ci
,
2702 ci
->i_ceph_flags
|= CEPH_I_KICK_FLUSH
;
2705 spin_unlock(&ci
->i_ceph_lock
);
2709 void ceph_kick_flushing_caps(struct ceph_mds_client
*mdsc
,
2710 struct ceph_mds_session
*session
)
2712 struct ceph_client
*cl
= mdsc
->fsc
->client
;
2713 struct ceph_inode_info
*ci
;
2714 struct ceph_cap
*cap
;
2715 u64 oldest_flush_tid
;
2717 lockdep_assert_held(&session
->s_mutex
);
2719 doutc(cl
, "mds%d\n", session
->s_mds
);
2721 spin_lock(&mdsc
->cap_dirty_lock
);
2722 oldest_flush_tid
= __get_oldest_flush_tid(mdsc
);
2723 spin_unlock(&mdsc
->cap_dirty_lock
);
2725 list_for_each_entry(ci
, &session
->s_cap_flushing
, i_flushing_item
) {
2726 struct inode
*inode
= &ci
->netfs
.inode
;
2728 spin_lock(&ci
->i_ceph_lock
);
2729 cap
= ci
->i_auth_cap
;
2730 if (!(cap
&& cap
->session
== session
)) {
2731 pr_err_client(cl
, "%p %llx.%llx auth cap %p not mds%d ???\n",
2732 inode
, ceph_vinop(inode
), cap
,
2734 spin_unlock(&ci
->i_ceph_lock
);
2737 if (ci
->i_ceph_flags
& CEPH_I_KICK_FLUSH
) {
2738 __kick_flushing_caps(mdsc
, session
, ci
,
2741 spin_unlock(&ci
->i_ceph_lock
);
2745 void ceph_kick_flushing_inode_caps(struct ceph_mds_session
*session
,
2746 struct ceph_inode_info
*ci
)
2748 struct ceph_mds_client
*mdsc
= session
->s_mdsc
;
2749 struct ceph_cap
*cap
= ci
->i_auth_cap
;
2750 struct inode
*inode
= &ci
->netfs
.inode
;
2752 lockdep_assert_held(&ci
->i_ceph_lock
);
2754 doutc(mdsc
->fsc
->client
, "%p %llx.%llx flushing %s\n",
2755 inode
, ceph_vinop(inode
),
2756 ceph_cap_string(ci
->i_flushing_caps
));
2758 if (!list_empty(&ci
->i_cap_flush_list
)) {
2759 u64 oldest_flush_tid
;
2760 spin_lock(&mdsc
->cap_dirty_lock
);
2761 list_move_tail(&ci
->i_flushing_item
,
2762 &cap
->session
->s_cap_flushing
);
2763 oldest_flush_tid
= __get_oldest_flush_tid(mdsc
);
2764 spin_unlock(&mdsc
->cap_dirty_lock
);
2766 __kick_flushing_caps(mdsc
, session
, ci
, oldest_flush_tid
);
2772 * Take references to capabilities we hold, so that we don't release
2773 * them to the MDS prematurely.
2775 void ceph_take_cap_refs(struct ceph_inode_info
*ci
, int got
,
2776 bool snap_rwsem_locked
)
2778 struct inode
*inode
= &ci
->netfs
.inode
;
2779 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
2781 lockdep_assert_held(&ci
->i_ceph_lock
);
2783 if (got
& CEPH_CAP_PIN
)
2785 if (got
& CEPH_CAP_FILE_RD
)
2787 if (got
& CEPH_CAP_FILE_CACHE
)
2788 ci
->i_rdcache_ref
++;
2789 if (got
& CEPH_CAP_FILE_EXCL
)
2791 if (got
& CEPH_CAP_FILE_WR
) {
2792 if (ci
->i_wr_ref
== 0 && !ci
->i_head_snapc
) {
2793 BUG_ON(!snap_rwsem_locked
);
2794 ci
->i_head_snapc
= ceph_get_snap_context(
2795 ci
->i_snap_realm
->cached_context
);
2799 if (got
& CEPH_CAP_FILE_BUFFER
) {
2800 if (ci
->i_wb_ref
== 0)
2803 doutc(cl
, "%p %llx.%llx wb %d -> %d (?)\n", inode
,
2804 ceph_vinop(inode
), ci
->i_wb_ref
-1, ci
->i_wb_ref
);
2809 * Try to grab cap references. Specify those refs we @want, and the
2810 * minimal set we @need. Also include the larger offset we are writing
2811 * to (when applicable), and check against max_size here as well.
2812 * Note that caller is responsible for ensuring max_size increases are
2813 * requested from the MDS.
2815 * Returns 0 if caps were not able to be acquired (yet), 1 if succeed,
2816 * or a negative error code. There are 3 speical error codes:
2817 * -EAGAIN: need to sleep but non-blocking is specified
2818 * -EFBIG: ask caller to call check_max_size() and try again.
2819 * -EUCLEAN: ask caller to call ceph_renew_caps() and try again.
2822 /* first 8 bits are reserved for CEPH_FILE_MODE_FOO */
2823 NON_BLOCKING
= (1 << 8),
2824 CHECK_FILELOCK
= (1 << 9),
2827 static int try_get_cap_refs(struct inode
*inode
, int need
, int want
,
2828 loff_t endoff
, int flags
, int *got
)
2830 struct ceph_inode_info
*ci
= ceph_inode(inode
);
2831 struct ceph_mds_client
*mdsc
= ceph_inode_to_fs_client(inode
)->mdsc
;
2832 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
2834 int have
, implemented
;
2835 bool snap_rwsem_locked
= false;
2837 doutc(cl
, "%p %llx.%llx need %s want %s\n", inode
,
2838 ceph_vinop(inode
), ceph_cap_string(need
),
2839 ceph_cap_string(want
));
2842 spin_lock(&ci
->i_ceph_lock
);
2844 if ((flags
& CHECK_FILELOCK
) &&
2845 (ci
->i_ceph_flags
& CEPH_I_ERROR_FILELOCK
)) {
2846 doutc(cl
, "%p %llx.%llx error filelock\n", inode
,
2852 /* finish pending truncate */
2853 while (ci
->i_truncate_pending
) {
2854 spin_unlock(&ci
->i_ceph_lock
);
2855 if (snap_rwsem_locked
) {
2856 up_read(&mdsc
->snap_rwsem
);
2857 snap_rwsem_locked
= false;
2859 __ceph_do_pending_vmtruncate(inode
);
2860 spin_lock(&ci
->i_ceph_lock
);
2863 have
= __ceph_caps_issued(ci
, &implemented
);
2865 if (have
& need
& CEPH_CAP_FILE_WR
) {
2866 if (endoff
>= 0 && endoff
> (loff_t
)ci
->i_max_size
) {
2867 doutc(cl
, "%p %llx.%llx endoff %llu > maxsize %llu\n",
2868 inode
, ceph_vinop(inode
), endoff
, ci
->i_max_size
);
2869 if (endoff
> ci
->i_requested_max_size
)
2870 ret
= ci
->i_auth_cap
? -EFBIG
: -EUCLEAN
;
2874 * If a sync write is in progress, we must wait, so that we
2875 * can get a final snapshot value for size+mtime.
2877 if (__ceph_have_pending_cap_snap(ci
)) {
2878 doutc(cl
, "%p %llx.%llx cap_snap_pending\n", inode
,
2884 if ((have
& need
) == need
) {
2886 * Look at (implemented & ~have & not) so that we keep waiting
2887 * on transition from wanted -> needed caps. This is needed
2888 * for WRBUFFER|WR -> WR to avoid a new WR sync write from
2889 * going before a prior buffered writeback happens.
2891 * For RDCACHE|RD -> RD, there is not need to wait and we can
2892 * just exclude the revoking caps and force to sync read.
2894 int not = want
& ~(have
& need
);
2895 int revoking
= implemented
& ~have
;
2896 int exclude
= revoking
& not;
2897 doutc(cl
, "%p %llx.%llx have %s but not %s (revoking %s)\n",
2898 inode
, ceph_vinop(inode
), ceph_cap_string(have
),
2899 ceph_cap_string(not), ceph_cap_string(revoking
));
2900 if (!exclude
|| !(exclude
& CEPH_CAP_FILE_BUFFER
)) {
2901 if (!snap_rwsem_locked
&&
2902 !ci
->i_head_snapc
&&
2903 (need
& CEPH_CAP_FILE_WR
)) {
2904 if (!down_read_trylock(&mdsc
->snap_rwsem
)) {
2906 * we can not call down_read() when
2907 * task isn't in TASK_RUNNING state
2909 if (flags
& NON_BLOCKING
) {
2914 spin_unlock(&ci
->i_ceph_lock
);
2915 down_read(&mdsc
->snap_rwsem
);
2916 snap_rwsem_locked
= true;
2919 snap_rwsem_locked
= true;
2921 if ((have
& want
) == want
)
2922 *got
= need
| (want
& ~exclude
);
2925 ceph_take_cap_refs(ci
, *got
, true);
2929 int session_readonly
= false;
2931 if (ci
->i_auth_cap
&&
2932 (need
& (CEPH_CAP_FILE_WR
| CEPH_CAP_FILE_EXCL
))) {
2933 struct ceph_mds_session
*s
= ci
->i_auth_cap
->session
;
2934 spin_lock(&s
->s_cap_lock
);
2935 session_readonly
= s
->s_readonly
;
2936 spin_unlock(&s
->s_cap_lock
);
2938 if (session_readonly
) {
2939 doutc(cl
, "%p %llx.%llx need %s but mds%d readonly\n",
2940 inode
, ceph_vinop(inode
), ceph_cap_string(need
),
2941 ci
->i_auth_cap
->mds
);
2946 if (ceph_inode_is_shutdown(inode
)) {
2947 doutc(cl
, "%p %llx.%llx inode is shutdown\n",
2948 inode
, ceph_vinop(inode
));
2952 mds_wanted
= __ceph_caps_mds_wanted(ci
, false);
2953 if (need
& ~mds_wanted
) {
2954 doutc(cl
, "%p %llx.%llx need %s > mds_wanted %s\n",
2955 inode
, ceph_vinop(inode
), ceph_cap_string(need
),
2956 ceph_cap_string(mds_wanted
));
2961 doutc(cl
, "%p %llx.%llx have %s need %s\n", inode
,
2962 ceph_vinop(inode
), ceph_cap_string(have
),
2963 ceph_cap_string(need
));
2967 __ceph_touch_fmode(ci
, mdsc
, flags
);
2969 spin_unlock(&ci
->i_ceph_lock
);
2970 if (snap_rwsem_locked
)
2971 up_read(&mdsc
->snap_rwsem
);
2974 ceph_update_cap_mis(&mdsc
->metric
);
2976 ceph_update_cap_hit(&mdsc
->metric
);
2978 doutc(cl
, "%p %llx.%llx ret %d got %s\n", inode
,
2979 ceph_vinop(inode
), ret
, ceph_cap_string(*got
));
2984 * Check the offset we are writing up to against our current
2985 * max_size. If necessary, tell the MDS we want to write to
2988 static void check_max_size(struct inode
*inode
, loff_t endoff
)
2990 struct ceph_inode_info
*ci
= ceph_inode(inode
);
2991 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
2994 /* do we need to explicitly request a larger max_size? */
2995 spin_lock(&ci
->i_ceph_lock
);
2996 if (endoff
>= ci
->i_max_size
&& endoff
> ci
->i_wanted_max_size
) {
2997 doutc(cl
, "write %p %llx.%llx at large endoff %llu, req max_size\n",
2998 inode
, ceph_vinop(inode
), endoff
);
2999 ci
->i_wanted_max_size
= endoff
;
3001 /* duplicate ceph_check_caps()'s logic */
3002 if (ci
->i_auth_cap
&&
3003 (ci
->i_auth_cap
->issued
& CEPH_CAP_FILE_WR
) &&
3004 ci
->i_wanted_max_size
> ci
->i_max_size
&&
3005 ci
->i_wanted_max_size
> ci
->i_requested_max_size
)
3007 spin_unlock(&ci
->i_ceph_lock
);
3009 ceph_check_caps(ci
, CHECK_CAPS_AUTHONLY
);
3012 static inline int get_used_fmode(int caps
)
3015 if (caps
& CEPH_CAP_FILE_RD
)
3016 fmode
|= CEPH_FILE_MODE_RD
;
3017 if (caps
& CEPH_CAP_FILE_WR
)
3018 fmode
|= CEPH_FILE_MODE_WR
;
3022 int ceph_try_get_caps(struct inode
*inode
, int need
, int want
,
3023 bool nonblock
, int *got
)
3027 BUG_ON(need
& ~CEPH_CAP_FILE_RD
);
3028 BUG_ON(want
& ~(CEPH_CAP_FILE_CACHE
| CEPH_CAP_FILE_LAZYIO
|
3029 CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
|
3030 CEPH_CAP_ANY_DIR_OPS
));
3032 ret
= ceph_pool_perm_check(inode
, need
);
3037 flags
= get_used_fmode(need
| want
);
3039 flags
|= NON_BLOCKING
;
3041 ret
= try_get_cap_refs(inode
, need
, want
, 0, flags
, got
);
3042 /* three special error codes */
3043 if (ret
== -EAGAIN
|| ret
== -EFBIG
|| ret
== -EUCLEAN
)
3049 * Wait for caps, and take cap references. If we can't get a WR cap
3050 * due to a small max_size, make sure we check_max_size (and possibly
3051 * ask the mds) so we don't get hung up indefinitely.
3053 int __ceph_get_caps(struct inode
*inode
, struct ceph_file_info
*fi
, int need
,
3054 int want
, loff_t endoff
, int *got
)
3056 struct ceph_inode_info
*ci
= ceph_inode(inode
);
3057 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
3058 int ret
, _got
, flags
;
3060 ret
= ceph_pool_perm_check(inode
, need
);
3064 if (fi
&& (fi
->fmode
& CEPH_FILE_MODE_WR
) &&
3065 fi
->filp_gen
!= READ_ONCE(fsc
->filp_gen
))
3068 flags
= get_used_fmode(need
| want
);
3071 flags
&= CEPH_FILE_MODE_MASK
;
3072 if (vfs_inode_has_locks(inode
))
3073 flags
|= CHECK_FILELOCK
;
3075 ret
= try_get_cap_refs(inode
, need
, want
, endoff
,
3077 WARN_ON_ONCE(ret
== -EAGAIN
);
3079 #ifdef CONFIG_DEBUG_FS
3080 struct ceph_mds_client
*mdsc
= fsc
->mdsc
;
3083 DEFINE_WAIT_FUNC(wait
, woken_wake_function
);
3085 #ifdef CONFIG_DEBUG_FS
3086 cw
.ino
= ceph_ino(inode
);
3087 cw
.tgid
= current
->tgid
;
3091 spin_lock(&mdsc
->caps_list_lock
);
3092 list_add(&cw
.list
, &mdsc
->cap_wait_list
);
3093 spin_unlock(&mdsc
->caps_list_lock
);
3096 /* make sure used fmode not timeout */
3097 ceph_get_fmode(ci
, flags
, FMODE_WAIT_BIAS
);
3098 add_wait_queue(&ci
->i_cap_wq
, &wait
);
3100 flags
|= NON_BLOCKING
;
3101 while (!(ret
= try_get_cap_refs(inode
, need
, want
,
3102 endoff
, flags
, &_got
))) {
3103 if (signal_pending(current
)) {
3107 wait_woken(&wait
, TASK_INTERRUPTIBLE
, MAX_SCHEDULE_TIMEOUT
);
3110 remove_wait_queue(&ci
->i_cap_wq
, &wait
);
3111 ceph_put_fmode(ci
, flags
, FMODE_WAIT_BIAS
);
3113 #ifdef CONFIG_DEBUG_FS
3114 spin_lock(&mdsc
->caps_list_lock
);
3116 spin_unlock(&mdsc
->caps_list_lock
);
3123 if (fi
&& (fi
->fmode
& CEPH_FILE_MODE_WR
) &&
3124 fi
->filp_gen
!= READ_ONCE(fsc
->filp_gen
)) {
3125 if (ret
>= 0 && _got
)
3126 ceph_put_cap_refs(ci
, _got
);
3131 if (ret
== -EFBIG
|| ret
== -EUCLEAN
) {
3132 int ret2
= ceph_wait_on_async_create(inode
);
3136 if (ret
== -EFBIG
) {
3137 check_max_size(inode
, endoff
);
3140 if (ret
== -EUCLEAN
) {
3141 /* session was killed, try renew caps */
3142 ret
= ceph_renew_caps(inode
, flags
);
3149 if (S_ISREG(ci
->netfs
.inode
.i_mode
) &&
3150 ceph_has_inline_data(ci
) &&
3151 (_got
& (CEPH_CAP_FILE_CACHE
|CEPH_CAP_FILE_LAZYIO
)) &&
3152 i_size_read(inode
) > 0) {
3154 find_get_page(inode
->i_mapping
, 0);
3156 bool uptodate
= PageUptodate(page
);
3163 * drop cap refs first because getattr while
3164 * holding * caps refs can cause deadlock.
3166 ceph_put_cap_refs(ci
, _got
);
3170 * getattr request will bring inline data into
3173 ret
= __ceph_do_getattr(inode
, NULL
,
3174 CEPH_STAT_CAP_INLINE_DATA
,
3186 int ceph_get_caps(struct file
*filp
, int need
, int want
, loff_t endoff
,
3189 struct ceph_file_info
*fi
= filp
->private_data
;
3190 struct inode
*inode
= file_inode(filp
);
3192 return __ceph_get_caps(inode
, fi
, need
, want
, endoff
, got
);
3196 * Take cap refs. Caller must already know we hold at least one ref
3197 * on the caps in question or we don't know this is safe.
3199 void ceph_get_cap_refs(struct ceph_inode_info
*ci
, int caps
)
3201 spin_lock(&ci
->i_ceph_lock
);
3202 ceph_take_cap_refs(ci
, caps
, false);
3203 spin_unlock(&ci
->i_ceph_lock
);
3208 * drop cap_snap that is not associated with any snapshot.
3209 * we don't need to send FLUSHSNAP message for it.
3211 static int ceph_try_drop_cap_snap(struct ceph_inode_info
*ci
,
3212 struct ceph_cap_snap
*capsnap
)
3214 struct inode
*inode
= &ci
->netfs
.inode
;
3215 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
3217 if (!capsnap
->need_flush
&&
3218 !capsnap
->writing
&& !capsnap
->dirty_pages
) {
3219 doutc(cl
, "%p follows %llu\n", capsnap
, capsnap
->follows
);
3220 BUG_ON(capsnap
->cap_flush
.tid
> 0);
3221 ceph_put_snap_context(capsnap
->context
);
3222 if (!list_is_last(&capsnap
->ci_item
, &ci
->i_cap_snaps
))
3223 ci
->i_ceph_flags
|= CEPH_I_FLUSH_SNAPS
;
3225 list_del(&capsnap
->ci_item
);
3226 ceph_put_cap_snap(capsnap
);
3232 enum put_cap_refs_mode
{
3233 PUT_CAP_REFS_SYNC
= 0,
3240 * If we released the last ref on any given cap, call ceph_check_caps
3241 * to release (or schedule a release).
3243 * If we are releasing a WR cap (from a sync write), finalize any affected
3244 * cap_snap, and wake up any waiters.
3246 static void __ceph_put_cap_refs(struct ceph_inode_info
*ci
, int had
,
3247 enum put_cap_refs_mode mode
)
3249 struct inode
*inode
= &ci
->netfs
.inode
;
3250 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
3251 int last
= 0, put
= 0, flushsnaps
= 0, wake
= 0;
3252 bool check_flushsnaps
= false;
3254 spin_lock(&ci
->i_ceph_lock
);
3255 if (had
& CEPH_CAP_PIN
)
3257 if (had
& CEPH_CAP_FILE_RD
)
3258 if (--ci
->i_rd_ref
== 0)
3260 if (had
& CEPH_CAP_FILE_CACHE
)
3261 if (--ci
->i_rdcache_ref
== 0)
3263 if (had
& CEPH_CAP_FILE_EXCL
)
3264 if (--ci
->i_fx_ref
== 0)
3266 if (had
& CEPH_CAP_FILE_BUFFER
) {
3267 if (--ci
->i_wb_ref
== 0) {
3269 /* put the ref held by ceph_take_cap_refs() */
3271 check_flushsnaps
= true;
3273 doutc(cl
, "%p %llx.%llx wb %d -> %d (?)\n", inode
,
3274 ceph_vinop(inode
), ci
->i_wb_ref
+1, ci
->i_wb_ref
);
3276 if (had
& CEPH_CAP_FILE_WR
) {
3277 if (--ci
->i_wr_ref
== 0) {
3279 * The Fb caps will always be took and released
3280 * together with the Fw caps.
3282 WARN_ON_ONCE(ci
->i_wb_ref
);
3285 check_flushsnaps
= true;
3286 if (ci
->i_wrbuffer_ref_head
== 0 &&
3287 ci
->i_dirty_caps
== 0 &&
3288 ci
->i_flushing_caps
== 0) {
3289 BUG_ON(!ci
->i_head_snapc
);
3290 ceph_put_snap_context(ci
->i_head_snapc
);
3291 ci
->i_head_snapc
= NULL
;
3293 /* see comment in __ceph_remove_cap() */
3294 if (!__ceph_is_any_real_caps(ci
) && ci
->i_snap_realm
)
3295 ceph_change_snap_realm(inode
, NULL
);
3298 if (check_flushsnaps
&& __ceph_have_pending_cap_snap(ci
)) {
3299 struct ceph_cap_snap
*capsnap
=
3300 list_last_entry(&ci
->i_cap_snaps
,
3301 struct ceph_cap_snap
,
3304 capsnap
->writing
= 0;
3305 if (ceph_try_drop_cap_snap(ci
, capsnap
))
3306 /* put the ref held by ceph_queue_cap_snap() */
3308 else if (__ceph_finish_cap_snap(ci
, capsnap
))
3312 spin_unlock(&ci
->i_ceph_lock
);
3314 doutc(cl
, "%p %llx.%llx had %s%s%s\n", inode
, ceph_vinop(inode
),
3315 ceph_cap_string(had
), last
? " last" : "", put
? " put" : "");
3318 case PUT_CAP_REFS_SYNC
:
3320 ceph_check_caps(ci
, 0);
3321 else if (flushsnaps
)
3322 ceph_flush_snaps(ci
, NULL
);
3324 case PUT_CAP_REFS_ASYNC
:
3326 ceph_queue_check_caps(inode
);
3327 else if (flushsnaps
)
3328 ceph_queue_flush_snaps(inode
);
3334 wake_up_all(&ci
->i_cap_wq
);
3339 void ceph_put_cap_refs(struct ceph_inode_info
*ci
, int had
)
3341 __ceph_put_cap_refs(ci
, had
, PUT_CAP_REFS_SYNC
);
3344 void ceph_put_cap_refs_async(struct ceph_inode_info
*ci
, int had
)
3346 __ceph_put_cap_refs(ci
, had
, PUT_CAP_REFS_ASYNC
);
3350 * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
3351 * context. Adjust per-snap dirty page accounting as appropriate.
3352 * Once all dirty data for a cap_snap is flushed, flush snapped file
3353 * metadata back to the MDS. If we dropped the last ref, call
3356 void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info
*ci
, int nr
,
3357 struct ceph_snap_context
*snapc
)
3359 struct inode
*inode
= &ci
->netfs
.inode
;
3360 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
3361 struct ceph_cap_snap
*capsnap
= NULL
, *iter
;
3364 bool flush_snaps
= false;
3365 bool complete_capsnap
= false;
3367 spin_lock(&ci
->i_ceph_lock
);
3368 ci
->i_wrbuffer_ref
-= nr
;
3369 if (ci
->i_wrbuffer_ref
== 0) {
3374 if (ci
->i_head_snapc
== snapc
) {
3375 ci
->i_wrbuffer_ref_head
-= nr
;
3376 if (ci
->i_wrbuffer_ref_head
== 0 &&
3377 ci
->i_wr_ref
== 0 &&
3378 ci
->i_dirty_caps
== 0 &&
3379 ci
->i_flushing_caps
== 0) {
3380 BUG_ON(!ci
->i_head_snapc
);
3381 ceph_put_snap_context(ci
->i_head_snapc
);
3382 ci
->i_head_snapc
= NULL
;
3384 doutc(cl
, "on %p %llx.%llx head %d/%d -> %d/%d %s\n",
3385 inode
, ceph_vinop(inode
), ci
->i_wrbuffer_ref
+nr
,
3386 ci
->i_wrbuffer_ref_head
+nr
, ci
->i_wrbuffer_ref
,
3387 ci
->i_wrbuffer_ref_head
, last
? " LAST" : "");
3389 list_for_each_entry(iter
, &ci
->i_cap_snaps
, ci_item
) {
3390 if (iter
->context
== snapc
) {
3398 * The capsnap should already be removed when removing
3399 * auth cap in the case of a forced unmount.
3401 WARN_ON_ONCE(ci
->i_auth_cap
);
3405 capsnap
->dirty_pages
-= nr
;
3406 if (capsnap
->dirty_pages
== 0) {
3407 complete_capsnap
= true;
3408 if (!capsnap
->writing
) {
3409 if (ceph_try_drop_cap_snap(ci
, capsnap
)) {
3412 ci
->i_ceph_flags
|= CEPH_I_FLUSH_SNAPS
;
3417 doutc(cl
, "%p %llx.%llx cap_snap %p snap %lld %d/%d -> %d/%d %s%s\n",
3418 inode
, ceph_vinop(inode
), capsnap
, capsnap
->context
->seq
,
3419 ci
->i_wrbuffer_ref
+nr
, capsnap
->dirty_pages
+ nr
,
3420 ci
->i_wrbuffer_ref
, capsnap
->dirty_pages
,
3421 last
? " (wrbuffer last)" : "",
3422 complete_capsnap
? " (complete capsnap)" : "");
3426 spin_unlock(&ci
->i_ceph_lock
);
3429 ceph_check_caps(ci
, 0);
3430 } else if (flush_snaps
) {
3431 ceph_flush_snaps(ci
, NULL
);
3433 if (complete_capsnap
)
3434 wake_up_all(&ci
->i_cap_wq
);
3441 * Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
3443 static void invalidate_aliases(struct inode
*inode
)
3445 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
3446 struct dentry
*dn
, *prev
= NULL
;
3448 doutc(cl
, "%p %llx.%llx\n", inode
, ceph_vinop(inode
));
3449 d_prune_aliases(inode
);
3451 * For non-directory inode, d_find_alias() only returns
3452 * hashed dentry. After calling d_invalidate(), the
3453 * dentry becomes unhashed.
3455 * For directory inode, d_find_alias() can return
3456 * unhashed dentry. But directory inode should have
3457 * one alias at most.
3459 while ((dn
= d_find_alias(inode
))) {
3473 struct cap_extra_info
{
3474 struct ceph_string
*pool_ns
;
3484 /* currently issued */
3486 struct timespec64 btime
;
3488 u32 fscrypt_auth_len
;
3489 u64 fscrypt_file_size
;
3493 * Handle a cap GRANT message from the MDS. (Note that a GRANT may
3494 * actually be a revocation if it specifies a smaller cap set.)
3496 * caller holds s_mutex and i_ceph_lock, we drop both.
3498 static void handle_cap_grant(struct inode
*inode
,
3499 struct ceph_mds_session
*session
,
3500 struct ceph_cap
*cap
,
3501 struct ceph_mds_caps
*grant
,
3502 struct ceph_buffer
*xattr_buf
,
3503 struct cap_extra_info
*extra_info
)
3504 __releases(ci
->i_ceph_lock
)
3505 __releases(session
->s_mdsc
->snap_rwsem
)
3507 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
3508 struct ceph_inode_info
*ci
= ceph_inode(inode
);
3509 int seq
= le32_to_cpu(grant
->seq
);
3510 int newcaps
= le32_to_cpu(grant
->caps
);
3511 int used
, wanted
, dirty
;
3512 u64 size
= le64_to_cpu(grant
->size
);
3513 u64 max_size
= le64_to_cpu(grant
->max_size
);
3514 unsigned char check_caps
= 0;
3515 bool was_stale
= cap
->cap_gen
< atomic_read(&session
->s_cap_gen
);
3517 bool writeback
= false;
3518 bool queue_trunc
= false;
3519 bool queue_invalidate
= false;
3520 bool deleted_inode
= false;
3521 bool fill_inline
= false;
3522 bool revoke_wait
= false;
3526 * If there is at least one crypto block then we'll trust
3527 * fscrypt_file_size. If the real length of the file is 0, then
3528 * ignore it (it has probably been truncated down to 0 by the MDS).
3530 if (IS_ENCRYPTED(inode
) && size
)
3531 size
= extra_info
->fscrypt_file_size
;
3533 doutc(cl
, "%p %llx.%llx cap %p mds%d seq %d %s\n", inode
,
3534 ceph_vinop(inode
), cap
, session
->s_mds
, seq
,
3535 ceph_cap_string(newcaps
));
3536 doutc(cl
, " size %llu max_size %llu, i_size %llu\n", size
,
3537 max_size
, i_size_read(inode
));
3541 * If CACHE is being revoked, and we have no dirty buffers,
3542 * try to invalidate (once). (If there are dirty buffers, we
3543 * will invalidate _after_ writeback.)
3545 if (S_ISREG(inode
->i_mode
) && /* don't invalidate readdir cache */
3546 ((cap
->issued
& ~newcaps
) & CEPH_CAP_FILE_CACHE
) &&
3547 (newcaps
& CEPH_CAP_FILE_LAZYIO
) == 0 &&
3548 !(ci
->i_wrbuffer_ref
|| ci
->i_wb_ref
)) {
3549 if (try_nonblocking_invalidate(inode
)) {
3550 /* there were locked pages.. invalidate later
3551 in a separate thread. */
3552 if (ci
->i_rdcache_revoking
!= ci
->i_rdcache_gen
) {
3553 queue_invalidate
= true;
3554 ci
->i_rdcache_revoking
= ci
->i_rdcache_gen
;
3560 cap
->issued
= cap
->implemented
= CEPH_CAP_PIN
;
3563 * auth mds of the inode changed. we received the cap export message,
3564 * but still haven't received the cap import message. handle_cap_export
3565 * updated the new auth MDS' cap.
3567 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
3568 * that was sent before the cap import message. So don't remove caps.
3570 if (ceph_seq_cmp(seq
, cap
->seq
) <= 0) {
3571 WARN_ON(cap
!= ci
->i_auth_cap
);
3572 WARN_ON(cap
->cap_id
!= le64_to_cpu(grant
->cap_id
));
3574 newcaps
|= cap
->issued
;
3577 /* side effects now are allowed */
3578 cap
->cap_gen
= atomic_read(&session
->s_cap_gen
);
3581 __check_cap_issue(ci
, cap
, newcaps
);
3583 inode_set_max_iversion_raw(inode
, extra_info
->change_attr
);
3585 if ((newcaps
& CEPH_CAP_AUTH_SHARED
) &&
3586 (extra_info
->issued
& CEPH_CAP_AUTH_EXCL
) == 0) {
3587 umode_t mode
= le32_to_cpu(grant
->mode
);
3589 if (inode_wrong_type(inode
, mode
))
3590 pr_warn_once("inode type changed! (ino %llx.%llx is 0%o, mds says 0%o)\n",
3591 ceph_vinop(inode
), inode
->i_mode
, mode
);
3593 inode
->i_mode
= mode
;
3594 inode
->i_uid
= make_kuid(&init_user_ns
, le32_to_cpu(grant
->uid
));
3595 inode
->i_gid
= make_kgid(&init_user_ns
, le32_to_cpu(grant
->gid
));
3596 ci
->i_btime
= extra_info
->btime
;
3597 doutc(cl
, "%p %llx.%llx mode 0%o uid.gid %d.%d\n", inode
,
3598 ceph_vinop(inode
), inode
->i_mode
,
3599 from_kuid(&init_user_ns
, inode
->i_uid
),
3600 from_kgid(&init_user_ns
, inode
->i_gid
));
3601 #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
3602 if (ci
->fscrypt_auth_len
!= extra_info
->fscrypt_auth_len
||
3603 memcmp(ci
->fscrypt_auth
, extra_info
->fscrypt_auth
,
3604 ci
->fscrypt_auth_len
))
3605 pr_warn_ratelimited_client(cl
,
3606 "cap grant attempt to change fscrypt_auth on non-I_NEW inode (old len %d new len %d)\n",
3607 ci
->fscrypt_auth_len
,
3608 extra_info
->fscrypt_auth_len
);
3612 if ((newcaps
& CEPH_CAP_LINK_SHARED
) &&
3613 (extra_info
->issued
& CEPH_CAP_LINK_EXCL
) == 0) {
3614 set_nlink(inode
, le32_to_cpu(grant
->nlink
));
3615 if (inode
->i_nlink
== 0)
3616 deleted_inode
= true;
3619 if ((extra_info
->issued
& CEPH_CAP_XATTR_EXCL
) == 0 &&
3621 int len
= le32_to_cpu(grant
->xattr_len
);
3622 u64 version
= le64_to_cpu(grant
->xattr_version
);
3624 if (version
> ci
->i_xattrs
.version
) {
3625 doutc(cl
, " got new xattrs v%llu on %p %llx.%llx len %d\n",
3626 version
, inode
, ceph_vinop(inode
), len
);
3627 if (ci
->i_xattrs
.blob
)
3628 ceph_buffer_put(ci
->i_xattrs
.blob
);
3629 ci
->i_xattrs
.blob
= ceph_buffer_get(xattr_buf
);
3630 ci
->i_xattrs
.version
= version
;
3631 ceph_forget_all_cached_acls(inode
);
3632 ceph_security_invalidate_secctx(inode
);
3636 if (newcaps
& CEPH_CAP_ANY_RD
) {
3637 struct timespec64 mtime
, atime
, ctime
;
3638 /* ctime/mtime/atime? */
3639 ceph_decode_timespec64(&mtime
, &grant
->mtime
);
3640 ceph_decode_timespec64(&atime
, &grant
->atime
);
3641 ceph_decode_timespec64(&ctime
, &grant
->ctime
);
3642 ceph_fill_file_time(inode
, extra_info
->issued
,
3643 le32_to_cpu(grant
->time_warp_seq
),
3644 &ctime
, &mtime
, &atime
);
3647 if ((newcaps
& CEPH_CAP_FILE_SHARED
) && extra_info
->dirstat_valid
) {
3648 ci
->i_files
= extra_info
->nfiles
;
3649 ci
->i_subdirs
= extra_info
->nsubdirs
;
3652 if (newcaps
& (CEPH_CAP_ANY_FILE_RD
| CEPH_CAP_ANY_FILE_WR
)) {
3653 /* file layout may have changed */
3654 s64 old_pool
= ci
->i_layout
.pool_id
;
3655 struct ceph_string
*old_ns
;
3657 ceph_file_layout_from_legacy(&ci
->i_layout
, &grant
->layout
);
3658 old_ns
= rcu_dereference_protected(ci
->i_layout
.pool_ns
,
3659 lockdep_is_held(&ci
->i_ceph_lock
));
3660 rcu_assign_pointer(ci
->i_layout
.pool_ns
, extra_info
->pool_ns
);
3662 if (ci
->i_layout
.pool_id
!= old_pool
||
3663 extra_info
->pool_ns
!= old_ns
)
3664 ci
->i_ceph_flags
&= ~CEPH_I_POOL_PERM
;
3666 extra_info
->pool_ns
= old_ns
;
3668 /* size/truncate_seq? */
3669 queue_trunc
= ceph_fill_file_size(inode
, extra_info
->issued
,
3670 le32_to_cpu(grant
->truncate_seq
),
3671 le64_to_cpu(grant
->truncate_size
),
3675 if (ci
->i_auth_cap
== cap
&& (newcaps
& CEPH_CAP_ANY_FILE_WR
)) {
3676 if (max_size
!= ci
->i_max_size
) {
3677 doutc(cl
, "max_size %lld -> %llu\n", ci
->i_max_size
,
3679 ci
->i_max_size
= max_size
;
3680 if (max_size
>= ci
->i_wanted_max_size
) {
3681 ci
->i_wanted_max_size
= 0; /* reset */
3682 ci
->i_requested_max_size
= 0;
3688 /* check cap bits */
3689 wanted
= __ceph_caps_wanted(ci
);
3690 used
= __ceph_caps_used(ci
);
3691 dirty
= __ceph_caps_dirty(ci
);
3692 doutc(cl
, " my wanted = %s, used = %s, dirty %s\n",
3693 ceph_cap_string(wanted
), ceph_cap_string(used
),
3694 ceph_cap_string(dirty
));
3696 if ((was_stale
|| le32_to_cpu(grant
->op
) == CEPH_CAP_OP_IMPORT
) &&
3697 (wanted
& ~(cap
->mds_wanted
| newcaps
))) {
3699 * If mds is importing cap, prior cap messages that update
3700 * 'wanted' may get dropped by mds (migrate seq mismatch).
3702 * We don't send cap message to update 'wanted' if what we
3703 * want are already issued. If mds revokes caps, cap message
3704 * that releases caps also tells mds what we want. But if
3705 * caps got revoked by mds forcedly (session stale). We may
3706 * haven't told mds what we want.
3711 /* revocation, grant, or no-op? */
3712 if (cap
->issued
& ~newcaps
) {
3713 int revoking
= cap
->issued
& ~newcaps
;
3715 doutc(cl
, "revocation: %s -> %s (revoking %s)\n",
3716 ceph_cap_string(cap
->issued
), ceph_cap_string(newcaps
),
3717 ceph_cap_string(revoking
));
3718 if (S_ISREG(inode
->i_mode
) &&
3719 (revoking
& used
& CEPH_CAP_FILE_BUFFER
)) {
3720 writeback
= true; /* initiate writeback; will delay ack */
3722 } else if (queue_invalidate
&&
3723 revoking
== CEPH_CAP_FILE_CACHE
&&
3724 (newcaps
& CEPH_CAP_FILE_LAZYIO
) == 0) {
3725 revoke_wait
= true; /* do nothing yet, invalidation will be queued */
3726 } else if (cap
== ci
->i_auth_cap
) {
3727 check_caps
= 1; /* check auth cap only */
3729 check_caps
= 2; /* check all caps */
3731 /* If there is new caps, try to wake up the waiters */
3732 if (~cap
->issued
& newcaps
)
3734 cap
->issued
= newcaps
;
3735 cap
->implemented
|= newcaps
;
3736 } else if (cap
->issued
== newcaps
) {
3737 doutc(cl
, "caps unchanged: %s -> %s\n",
3738 ceph_cap_string(cap
->issued
),
3739 ceph_cap_string(newcaps
));
3741 doutc(cl
, "grant: %s -> %s\n", ceph_cap_string(cap
->issued
),
3742 ceph_cap_string(newcaps
));
3743 /* non-auth MDS is revoking the newly grant caps ? */
3744 if (cap
== ci
->i_auth_cap
&&
3745 __ceph_caps_revoking_other(ci
, cap
, newcaps
))
3748 cap
->issued
= newcaps
;
3749 cap
->implemented
|= newcaps
; /* add bits only, to
3750 * avoid stepping on a
3751 * pending revocation */
3754 BUG_ON(cap
->issued
& ~cap
->implemented
);
3756 /* don't let check_caps skip sending a response to MDS for revoke msgs */
3757 if (!revoke_wait
&& le32_to_cpu(grant
->op
) == CEPH_CAP_OP_REVOKE
) {
3758 cap
->mds_wanted
= 0;
3759 flags
|= CHECK_CAPS_FLUSH_FORCE
;
3760 if (cap
== ci
->i_auth_cap
)
3761 check_caps
= 1; /* check auth cap only */
3763 check_caps
= 2; /* check all caps */
3766 if (extra_info
->inline_version
> 0 &&
3767 extra_info
->inline_version
>= ci
->i_inline_version
) {
3768 ci
->i_inline_version
= extra_info
->inline_version
;
3769 if (ci
->i_inline_version
!= CEPH_INLINE_NONE
&&
3770 (newcaps
& (CEPH_CAP_FILE_CACHE
|CEPH_CAP_FILE_LAZYIO
)))
3774 if (le32_to_cpu(grant
->op
) == CEPH_CAP_OP_IMPORT
) {
3775 if (ci
->i_auth_cap
== cap
) {
3776 if (newcaps
& ~extra_info
->issued
)
3779 if (ci
->i_requested_max_size
> max_size
||
3780 !(le32_to_cpu(grant
->wanted
) & CEPH_CAP_ANY_FILE_WR
)) {
3781 /* re-request max_size if necessary */
3782 ci
->i_requested_max_size
= 0;
3786 ceph_kick_flushing_inode_caps(session
, ci
);
3788 up_read(&session
->s_mdsc
->snap_rwsem
);
3790 spin_unlock(&ci
->i_ceph_lock
);
3793 ceph_fill_inline_data(inode
, NULL
, extra_info
->inline_data
,
3794 extra_info
->inline_len
);
3797 ceph_queue_vmtruncate(inode
);
3801 * queue inode for writeback: we can't actually call
3802 * filemap_write_and_wait, etc. from message handler
3805 ceph_queue_writeback(inode
);
3806 if (queue_invalidate
)
3807 ceph_queue_invalidate(inode
);
3809 invalidate_aliases(inode
);
3811 wake_up_all(&ci
->i_cap_wq
);
3813 mutex_unlock(&session
->s_mutex
);
3814 if (check_caps
== 1)
3815 ceph_check_caps(ci
, flags
| CHECK_CAPS_AUTHONLY
| CHECK_CAPS_NOINVAL
);
3816 else if (check_caps
== 2)
3817 ceph_check_caps(ci
, flags
| CHECK_CAPS_NOINVAL
);
3821 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
3822 * MDS has been safely committed.
3824 static void handle_cap_flush_ack(struct inode
*inode
, u64 flush_tid
,
3825 struct ceph_mds_caps
*m
,
3826 struct ceph_mds_session
*session
,
3827 struct ceph_cap
*cap
)
3828 __releases(ci
->i_ceph_lock
)
3830 struct ceph_inode_info
*ci
= ceph_inode(inode
);
3831 struct ceph_mds_client
*mdsc
= ceph_sb_to_fs_client(inode
->i_sb
)->mdsc
;
3832 struct ceph_client
*cl
= mdsc
->fsc
->client
;
3833 struct ceph_cap_flush
*cf
, *tmp_cf
;
3834 LIST_HEAD(to_remove
);
3835 unsigned seq
= le32_to_cpu(m
->seq
);
3836 int dirty
= le32_to_cpu(m
->dirty
);
3839 bool wake_ci
= false;
3840 bool wake_mdsc
= false;
3842 list_for_each_entry_safe(cf
, tmp_cf
, &ci
->i_cap_flush_list
, i_list
) {
3843 /* Is this the one that was flushed? */
3844 if (cf
->tid
== flush_tid
)
3847 /* Is this a capsnap? */
3851 if (cf
->tid
<= flush_tid
) {
3853 * An earlier or current tid. The FLUSH_ACK should
3854 * represent a superset of this flush's caps.
3856 wake_ci
|= __detach_cap_flush_from_ci(ci
, cf
);
3857 list_add_tail(&cf
->i_list
, &to_remove
);
3860 * This is a later one. Any caps in it are still dirty
3861 * so don't count them as cleaned.
3863 cleaned
&= ~cf
->caps
;
3869 doutc(cl
, "%p %llx.%llx mds%d seq %d on %s cleaned %s, flushing %s -> %s\n",
3870 inode
, ceph_vinop(inode
), session
->s_mds
, seq
,
3871 ceph_cap_string(dirty
), ceph_cap_string(cleaned
),
3872 ceph_cap_string(ci
->i_flushing_caps
),
3873 ceph_cap_string(ci
->i_flushing_caps
& ~cleaned
));
3875 if (list_empty(&to_remove
) && !cleaned
)
3878 ci
->i_flushing_caps
&= ~cleaned
;
3880 spin_lock(&mdsc
->cap_dirty_lock
);
3882 list_for_each_entry(cf
, &to_remove
, i_list
)
3883 wake_mdsc
|= __detach_cap_flush_from_mdsc(mdsc
, cf
);
3885 if (ci
->i_flushing_caps
== 0) {
3886 if (list_empty(&ci
->i_cap_flush_list
)) {
3887 list_del_init(&ci
->i_flushing_item
);
3888 if (!list_empty(&session
->s_cap_flushing
)) {
3889 struct inode
*inode
=
3890 &list_first_entry(&session
->s_cap_flushing
,
3891 struct ceph_inode_info
,
3892 i_flushing_item
)->netfs
.inode
;
3893 doutc(cl
, " mds%d still flushing cap on %p %llx.%llx\n",
3894 session
->s_mds
, inode
, ceph_vinop(inode
));
3897 mdsc
->num_cap_flushing
--;
3898 doutc(cl
, " %p %llx.%llx now !flushing\n", inode
,
3901 if (ci
->i_dirty_caps
== 0) {
3902 doutc(cl
, " %p %llx.%llx now clean\n", inode
,
3904 BUG_ON(!list_empty(&ci
->i_dirty_item
));
3906 if (ci
->i_wr_ref
== 0 &&
3907 ci
->i_wrbuffer_ref_head
== 0) {
3908 BUG_ON(!ci
->i_head_snapc
);
3909 ceph_put_snap_context(ci
->i_head_snapc
);
3910 ci
->i_head_snapc
= NULL
;
3913 BUG_ON(list_empty(&ci
->i_dirty_item
));
3916 spin_unlock(&mdsc
->cap_dirty_lock
);
3919 spin_unlock(&ci
->i_ceph_lock
);
3921 while (!list_empty(&to_remove
)) {
3922 cf
= list_first_entry(&to_remove
,
3923 struct ceph_cap_flush
, i_list
);
3924 list_del_init(&cf
->i_list
);
3925 if (!cf
->is_capsnap
)
3926 ceph_free_cap_flush(cf
);
3930 wake_up_all(&ci
->i_cap_wq
);
3932 wake_up_all(&mdsc
->cap_flushing_wq
);
3937 void __ceph_remove_capsnap(struct inode
*inode
, struct ceph_cap_snap
*capsnap
,
3938 bool *wake_ci
, bool *wake_mdsc
)
3940 struct ceph_inode_info
*ci
= ceph_inode(inode
);
3941 struct ceph_mds_client
*mdsc
= ceph_sb_to_fs_client(inode
->i_sb
)->mdsc
;
3942 struct ceph_client
*cl
= mdsc
->fsc
->client
;
3945 lockdep_assert_held(&ci
->i_ceph_lock
);
3947 doutc(cl
, "removing capsnap %p, %p %llx.%llx ci %p\n", capsnap
,
3948 inode
, ceph_vinop(inode
), ci
);
3950 list_del_init(&capsnap
->ci_item
);
3951 ret
= __detach_cap_flush_from_ci(ci
, &capsnap
->cap_flush
);
3955 spin_lock(&mdsc
->cap_dirty_lock
);
3956 if (list_empty(&ci
->i_cap_flush_list
))
3957 list_del_init(&ci
->i_flushing_item
);
3959 ret
= __detach_cap_flush_from_mdsc(mdsc
, &capsnap
->cap_flush
);
3962 spin_unlock(&mdsc
->cap_dirty_lock
);
3965 void ceph_remove_capsnap(struct inode
*inode
, struct ceph_cap_snap
*capsnap
,
3966 bool *wake_ci
, bool *wake_mdsc
)
3968 struct ceph_inode_info
*ci
= ceph_inode(inode
);
3970 lockdep_assert_held(&ci
->i_ceph_lock
);
3972 WARN_ON_ONCE(capsnap
->dirty_pages
|| capsnap
->writing
);
3973 __ceph_remove_capsnap(inode
, capsnap
, wake_ci
, wake_mdsc
);
3977 * Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can
3978 * throw away our cap_snap.
3980 * Caller hold s_mutex.
3982 static void handle_cap_flushsnap_ack(struct inode
*inode
, u64 flush_tid
,
3983 struct ceph_mds_caps
*m
,
3984 struct ceph_mds_session
*session
)
3986 struct ceph_inode_info
*ci
= ceph_inode(inode
);
3987 struct ceph_mds_client
*mdsc
= ceph_sb_to_fs_client(inode
->i_sb
)->mdsc
;
3988 struct ceph_client
*cl
= mdsc
->fsc
->client
;
3989 u64 follows
= le64_to_cpu(m
->snap_follows
);
3990 struct ceph_cap_snap
*capsnap
= NULL
, *iter
;
3991 bool wake_ci
= false;
3992 bool wake_mdsc
= false;
3994 doutc(cl
, "%p %llx.%llx ci %p mds%d follows %lld\n", inode
,
3995 ceph_vinop(inode
), ci
, session
->s_mds
, follows
);
3997 spin_lock(&ci
->i_ceph_lock
);
3998 list_for_each_entry(iter
, &ci
->i_cap_snaps
, ci_item
) {
3999 if (iter
->follows
== follows
) {
4000 if (iter
->cap_flush
.tid
!= flush_tid
) {
4001 doutc(cl
, " cap_snap %p follows %lld "
4002 "tid %lld != %lld\n", iter
,
4004 iter
->cap_flush
.tid
);
4010 doutc(cl
, " skipping cap_snap %p follows %lld\n",
4011 iter
, iter
->follows
);
4015 ceph_remove_capsnap(inode
, capsnap
, &wake_ci
, &wake_mdsc
);
4016 spin_unlock(&ci
->i_ceph_lock
);
4019 ceph_put_snap_context(capsnap
->context
);
4020 ceph_put_cap_snap(capsnap
);
4022 wake_up_all(&ci
->i_cap_wq
);
4024 wake_up_all(&mdsc
->cap_flushing_wq
);
4030 * Handle TRUNC from MDS, indicating file truncation.
4032 * caller hold s_mutex.
4034 static bool handle_cap_trunc(struct inode
*inode
,
4035 struct ceph_mds_caps
*trunc
,
4036 struct ceph_mds_session
*session
,
4037 struct cap_extra_info
*extra_info
)
4039 struct ceph_inode_info
*ci
= ceph_inode(inode
);
4040 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
4041 int mds
= session
->s_mds
;
4042 int seq
= le32_to_cpu(trunc
->seq
);
4043 u32 truncate_seq
= le32_to_cpu(trunc
->truncate_seq
);
4044 u64 truncate_size
= le64_to_cpu(trunc
->truncate_size
);
4045 u64 size
= le64_to_cpu(trunc
->size
);
4046 int implemented
= 0;
4047 int dirty
= __ceph_caps_dirty(ci
);
4048 int issued
= __ceph_caps_issued(ceph_inode(inode
), &implemented
);
4049 bool queue_trunc
= false;
4051 lockdep_assert_held(&ci
->i_ceph_lock
);
4053 issued
|= implemented
| dirty
;
4056 * If there is at least one crypto block then we'll trust
4057 * fscrypt_file_size. If the real length of the file is 0, then
4058 * ignore it (it has probably been truncated down to 0 by the MDS).
4060 if (IS_ENCRYPTED(inode
) && size
)
4061 size
= extra_info
->fscrypt_file_size
;
4063 doutc(cl
, "%p %llx.%llx mds%d seq %d to %lld truncate seq %d\n",
4064 inode
, ceph_vinop(inode
), mds
, seq
, truncate_size
, truncate_seq
);
4065 queue_trunc
= ceph_fill_file_size(inode
, issued
,
4066 truncate_seq
, truncate_size
, size
);
4071 * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a
4072 * different one. If we are the most recent migration we've seen (as
4073 * indicated by mseq), make note of the migrating cap bits for the
4074 * duration (until we see the corresponding IMPORT).
4076 * caller holds s_mutex
4078 static void handle_cap_export(struct inode
*inode
, struct ceph_mds_caps
*ex
,
4079 struct ceph_mds_cap_peer
*ph
,
4080 struct ceph_mds_session
*session
)
4082 struct ceph_mds_client
*mdsc
= ceph_inode_to_fs_client(inode
)->mdsc
;
4083 struct ceph_client
*cl
= mdsc
->fsc
->client
;
4084 struct ceph_mds_session
*tsession
= NULL
;
4085 struct ceph_cap
*cap
, *tcap
, *new_cap
= NULL
;
4086 struct ceph_inode_info
*ci
= ceph_inode(inode
);
4088 unsigned mseq
= le32_to_cpu(ex
->migrate_seq
);
4089 unsigned t_seq
, t_mseq
;
4091 int mds
= session
->s_mds
;
4094 t_cap_id
= le64_to_cpu(ph
->cap_id
);
4095 t_seq
= le32_to_cpu(ph
->seq
);
4096 t_mseq
= le32_to_cpu(ph
->mseq
);
4097 target
= le32_to_cpu(ph
->mds
);
4099 t_cap_id
= t_seq
= t_mseq
= 0;
4103 doutc(cl
, "%p %llx.%llx ci %p mds%d mseq %d target %d\n",
4104 inode
, ceph_vinop(inode
), ci
, mds
, mseq
, target
);
4106 down_read(&mdsc
->snap_rwsem
);
4107 spin_lock(&ci
->i_ceph_lock
);
4108 cap
= __get_cap_for_mds(ci
, mds
);
4109 if (!cap
|| cap
->cap_id
!= le64_to_cpu(ex
->cap_id
))
4113 ceph_remove_cap(mdsc
, cap
, false);
4118 * now we know we haven't received the cap import message yet
4119 * because the exported cap still exist.
4122 issued
= cap
->issued
;
4123 if (issued
!= cap
->implemented
)
4124 pr_err_ratelimited_client(cl
, "issued != implemented: "
4125 "%p %llx.%llx mds%d seq %d mseq %d"
4126 " issued %s implemented %s\n",
4127 inode
, ceph_vinop(inode
), mds
,
4128 cap
->seq
, cap
->mseq
,
4129 ceph_cap_string(issued
),
4130 ceph_cap_string(cap
->implemented
));
4133 tcap
= __get_cap_for_mds(ci
, target
);
4135 /* already have caps from the target */
4136 if (tcap
->cap_id
== t_cap_id
&&
4137 ceph_seq_cmp(tcap
->seq
, t_seq
) < 0) {
4138 doutc(cl
, " updating import cap %p mds%d\n", tcap
,
4140 tcap
->cap_id
= t_cap_id
;
4141 tcap
->seq
= t_seq
- 1;
4142 tcap
->issue_seq
= t_seq
- 1;
4143 tcap
->issued
|= issued
;
4144 tcap
->implemented
|= issued
;
4145 if (cap
== ci
->i_auth_cap
) {
4146 ci
->i_auth_cap
= tcap
;
4147 change_auth_cap_ses(ci
, tcap
->session
);
4150 ceph_remove_cap(mdsc
, cap
, false);
4152 } else if (tsession
) {
4153 /* add placeholder for the export target */
4154 int flag
= (cap
== ci
->i_auth_cap
) ? CEPH_CAP_FLAG_AUTH
: 0;
4156 ceph_add_cap(inode
, tsession
, t_cap_id
, issued
, 0,
4157 t_seq
- 1, t_mseq
, (u64
)-1, flag
, &new_cap
);
4159 if (!list_empty(&ci
->i_cap_flush_list
) &&
4160 ci
->i_auth_cap
== tcap
) {
4161 spin_lock(&mdsc
->cap_dirty_lock
);
4162 list_move_tail(&ci
->i_flushing_item
,
4163 &tcap
->session
->s_cap_flushing
);
4164 spin_unlock(&mdsc
->cap_dirty_lock
);
4167 ceph_remove_cap(mdsc
, cap
, false);
4171 spin_unlock(&ci
->i_ceph_lock
);
4172 up_read(&mdsc
->snap_rwsem
);
4173 mutex_unlock(&session
->s_mutex
);
4175 /* open target session */
4176 tsession
= ceph_mdsc_open_export_target_session(mdsc
, target
);
4177 if (!IS_ERR(tsession
)) {
4179 mutex_lock(&session
->s_mutex
);
4180 mutex_lock_nested(&tsession
->s_mutex
,
4181 SINGLE_DEPTH_NESTING
);
4183 mutex_lock(&tsession
->s_mutex
);
4184 mutex_lock_nested(&session
->s_mutex
,
4185 SINGLE_DEPTH_NESTING
);
4187 new_cap
= ceph_get_cap(mdsc
, NULL
);
4192 mutex_lock(&session
->s_mutex
);
4197 spin_unlock(&ci
->i_ceph_lock
);
4198 up_read(&mdsc
->snap_rwsem
);
4199 mutex_unlock(&session
->s_mutex
);
4201 mutex_unlock(&tsession
->s_mutex
);
4202 ceph_put_mds_session(tsession
);
4205 ceph_put_cap(mdsc
, new_cap
);
4209 * Handle cap IMPORT.
4211 * caller holds s_mutex. acquires i_ceph_lock
4213 static void handle_cap_import(struct ceph_mds_client
*mdsc
,
4214 struct inode
*inode
, struct ceph_mds_caps
*im
,
4215 struct ceph_mds_cap_peer
*ph
,
4216 struct ceph_mds_session
*session
,
4217 struct ceph_cap
**target_cap
, int *old_issued
)
4219 struct ceph_inode_info
*ci
= ceph_inode(inode
);
4220 struct ceph_client
*cl
= mdsc
->fsc
->client
;
4221 struct ceph_cap
*cap
, *ocap
, *new_cap
= NULL
;
4222 int mds
= session
->s_mds
;
4224 unsigned caps
= le32_to_cpu(im
->caps
);
4225 unsigned wanted
= le32_to_cpu(im
->wanted
);
4226 unsigned seq
= le32_to_cpu(im
->seq
);
4227 unsigned mseq
= le32_to_cpu(im
->migrate_seq
);
4228 u64 realmino
= le64_to_cpu(im
->realm
);
4229 u64 cap_id
= le64_to_cpu(im
->cap_id
);
4234 p_cap_id
= le64_to_cpu(ph
->cap_id
);
4235 peer
= le32_to_cpu(ph
->mds
);
4241 doutc(cl
, "%p %llx.%llx ci %p mds%d mseq %d peer %d\n",
4242 inode
, ceph_vinop(inode
), ci
, mds
, mseq
, peer
);
4244 cap
= __get_cap_for_mds(ci
, mds
);
4247 spin_unlock(&ci
->i_ceph_lock
);
4248 new_cap
= ceph_get_cap(mdsc
, NULL
);
4249 spin_lock(&ci
->i_ceph_lock
);
4255 ceph_put_cap(mdsc
, new_cap
);
4260 __ceph_caps_issued(ci
, &issued
);
4261 issued
|= __ceph_caps_dirty(ci
);
4263 ceph_add_cap(inode
, session
, cap_id
, caps
, wanted
, seq
, mseq
,
4264 realmino
, CEPH_CAP_FLAG_AUTH
, &new_cap
);
4266 ocap
= peer
>= 0 ? __get_cap_for_mds(ci
, peer
) : NULL
;
4267 if (ocap
&& ocap
->cap_id
== p_cap_id
) {
4268 doutc(cl
, " remove export cap %p mds%d flags %d\n",
4269 ocap
, peer
, ph
->flags
);
4270 if ((ph
->flags
& CEPH_CAP_FLAG_AUTH
) &&
4271 (ocap
->seq
!= le32_to_cpu(ph
->seq
) ||
4272 ocap
->mseq
!= le32_to_cpu(ph
->mseq
))) {
4273 pr_err_ratelimited_client(cl
, "mismatched seq/mseq: "
4274 "%p %llx.%llx mds%d seq %d mseq %d"
4275 " importer mds%d has peer seq %d mseq %d\n",
4276 inode
, ceph_vinop(inode
), peer
,
4277 ocap
->seq
, ocap
->mseq
, mds
,
4278 le32_to_cpu(ph
->seq
),
4279 le32_to_cpu(ph
->mseq
));
4281 ceph_remove_cap(mdsc
, ocap
, (ph
->flags
& CEPH_CAP_FLAG_RELEASE
));
4284 *old_issued
= issued
;
4288 #ifdef CONFIG_FS_ENCRYPTION
4289 static int parse_fscrypt_fields(void **p
, void *end
,
4290 struct cap_extra_info
*extra
)
4294 ceph_decode_32_safe(p
, end
, extra
->fscrypt_auth_len
, bad
);
4295 if (extra
->fscrypt_auth_len
) {
4296 ceph_decode_need(p
, end
, extra
->fscrypt_auth_len
, bad
);
4297 extra
->fscrypt_auth
= kmalloc(extra
->fscrypt_auth_len
,
4299 if (!extra
->fscrypt_auth
)
4301 ceph_decode_copy_safe(p
, end
, extra
->fscrypt_auth
,
4302 extra
->fscrypt_auth_len
, bad
);
4305 ceph_decode_32_safe(p
, end
, len
, bad
);
4306 if (len
>= sizeof(u64
)) {
4307 ceph_decode_64_safe(p
, end
, extra
->fscrypt_file_size
, bad
);
4310 ceph_decode_skip_n(p
, end
, len
, bad
);
4316 static int parse_fscrypt_fields(void **p
, void *end
,
4317 struct cap_extra_info
*extra
)
4321 /* Don't care about these fields unless we're encryption-capable */
4322 ceph_decode_32_safe(p
, end
, len
, bad
);
4324 ceph_decode_skip_n(p
, end
, len
, bad
);
4325 ceph_decode_32_safe(p
, end
, len
, bad
);
4327 ceph_decode_skip_n(p
, end
, len
, bad
);
4335 * Handle a caps message from the MDS.
4337 * Identify the appropriate session, inode, and call the right handler
4338 * based on the cap op.
4340 void ceph_handle_caps(struct ceph_mds_session
*session
,
4341 struct ceph_msg
*msg
)
4343 struct ceph_mds_client
*mdsc
= session
->s_mdsc
;
4344 struct ceph_client
*cl
= mdsc
->fsc
->client
;
4345 struct inode
*inode
;
4346 struct ceph_inode_info
*ci
;
4347 struct ceph_cap
*cap
;
4348 struct ceph_mds_caps
*h
;
4349 struct ceph_mds_cap_peer
*peer
= NULL
;
4350 struct ceph_snap_realm
*realm
= NULL
;
4352 int msg_version
= le16_to_cpu(msg
->hdr
.version
);
4354 struct ceph_vino vino
;
4356 size_t snaptrace_len
;
4358 struct cap_extra_info extra_info
= {};
4360 bool close_sessions
= false;
4361 bool do_cap_release
= false;
4363 doutc(cl
, "from mds%d\n", session
->s_mds
);
4365 if (!ceph_inc_mds_stopping_blocker(mdsc
, session
))
4369 end
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
4370 if (msg
->front
.iov_len
< sizeof(*h
))
4372 h
= msg
->front
.iov_base
;
4373 op
= le32_to_cpu(h
->op
);
4374 vino
.ino
= le64_to_cpu(h
->ino
);
4375 vino
.snap
= CEPH_NOSNAP
;
4376 seq
= le32_to_cpu(h
->seq
);
4377 mseq
= le32_to_cpu(h
->migrate_seq
);
4380 snaptrace_len
= le32_to_cpu(h
->snap_trace_len
);
4381 p
= snaptrace
+ snaptrace_len
;
4383 if (msg_version
>= 2) {
4385 ceph_decode_32_safe(&p
, end
, flock_len
, bad
);
4386 if (p
+ flock_len
> end
)
4391 if (msg_version
>= 3) {
4392 if (op
== CEPH_CAP_OP_IMPORT
) {
4393 if (p
+ sizeof(*peer
) > end
)
4397 } else if (op
== CEPH_CAP_OP_EXPORT
) {
4398 /* recorded in unused fields */
4399 peer
= (void *)&h
->size
;
4403 if (msg_version
>= 4) {
4404 ceph_decode_64_safe(&p
, end
, extra_info
.inline_version
, bad
);
4405 ceph_decode_32_safe(&p
, end
, extra_info
.inline_len
, bad
);
4406 if (p
+ extra_info
.inline_len
> end
)
4408 extra_info
.inline_data
= p
;
4409 p
+= extra_info
.inline_len
;
4412 if (msg_version
>= 5) {
4413 struct ceph_osd_client
*osdc
= &mdsc
->fsc
->client
->osdc
;
4416 ceph_decode_32_safe(&p
, end
, epoch_barrier
, bad
);
4417 ceph_osdc_update_epoch_barrier(osdc
, epoch_barrier
);
4420 if (msg_version
>= 8) {
4424 ceph_decode_skip_64(&p
, end
, bad
); // flush_tid
4426 ceph_decode_skip_32(&p
, end
, bad
); // caller_uid
4427 ceph_decode_skip_32(&p
, end
, bad
); // caller_gid
4429 ceph_decode_32_safe(&p
, end
, pool_ns_len
, bad
);
4430 if (pool_ns_len
> 0) {
4431 ceph_decode_need(&p
, end
, pool_ns_len
, bad
);
4432 extra_info
.pool_ns
=
4433 ceph_find_or_create_string(p
, pool_ns_len
);
4438 if (msg_version
>= 9) {
4439 struct ceph_timespec
*btime
;
4441 if (p
+ sizeof(*btime
) > end
)
4444 ceph_decode_timespec64(&extra_info
.btime
, btime
);
4445 p
+= sizeof(*btime
);
4446 ceph_decode_64_safe(&p
, end
, extra_info
.change_attr
, bad
);
4449 if (msg_version
>= 11) {
4451 ceph_decode_skip_32(&p
, end
, bad
); // flags
4453 extra_info
.dirstat_valid
= true;
4454 ceph_decode_64_safe(&p
, end
, extra_info
.nfiles
, bad
);
4455 ceph_decode_64_safe(&p
, end
, extra_info
.nsubdirs
, bad
);
4458 if (msg_version
>= 12) {
4459 if (parse_fscrypt_fields(&p
, end
, &extra_info
))
4464 inode
= ceph_find_inode(mdsc
->fsc
->sb
, vino
);
4465 doutc(cl
, " op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op
),
4466 vino
.ino
, vino
.snap
, inode
);
4468 mutex_lock(&session
->s_mutex
);
4469 doutc(cl
, " mds%d seq %lld cap seq %u\n", session
->s_mds
,
4470 session
->s_seq
, (unsigned)seq
);
4473 doutc(cl
, " i don't have ino %llx\n", vino
.ino
);
4476 case CEPH_CAP_OP_IMPORT
:
4477 case CEPH_CAP_OP_REVOKE
:
4478 case CEPH_CAP_OP_GRANT
:
4479 do_cap_release
= true;
4484 goto flush_cap_releases
;
4486 ci
= ceph_inode(inode
);
4488 /* these will work even if we don't have a cap yet */
4490 case CEPH_CAP_OP_FLUSHSNAP_ACK
:
4491 handle_cap_flushsnap_ack(inode
, le64_to_cpu(msg
->hdr
.tid
),
4495 case CEPH_CAP_OP_EXPORT
:
4496 handle_cap_export(inode
, h
, peer
, session
);
4499 case CEPH_CAP_OP_IMPORT
:
4501 if (snaptrace_len
) {
4502 down_write(&mdsc
->snap_rwsem
);
4503 if (ceph_update_snap_trace(mdsc
, snaptrace
,
4504 snaptrace
+ snaptrace_len
,
4506 up_write(&mdsc
->snap_rwsem
);
4507 close_sessions
= true;
4510 downgrade_write(&mdsc
->snap_rwsem
);
4512 down_read(&mdsc
->snap_rwsem
);
4514 spin_lock(&ci
->i_ceph_lock
);
4515 handle_cap_import(mdsc
, inode
, h
, peer
, session
,
4516 &cap
, &extra_info
.issued
);
4517 handle_cap_grant(inode
, session
, cap
,
4518 h
, msg
->middle
, &extra_info
);
4520 ceph_put_snap_realm(mdsc
, realm
);
4524 /* the rest require a cap */
4525 spin_lock(&ci
->i_ceph_lock
);
4526 cap
= __get_cap_for_mds(ceph_inode(inode
), session
->s_mds
);
4528 doutc(cl
, " no cap on %p ino %llx.%llx from mds%d\n",
4529 inode
, ceph_ino(inode
), ceph_snap(inode
),
4531 spin_unlock(&ci
->i_ceph_lock
);
4533 case CEPH_CAP_OP_REVOKE
:
4534 case CEPH_CAP_OP_GRANT
:
4535 do_cap_release
= true;
4540 goto flush_cap_releases
;
4543 /* note that each of these drops i_ceph_lock for us */
4545 case CEPH_CAP_OP_REVOKE
:
4546 case CEPH_CAP_OP_GRANT
:
4547 __ceph_caps_issued(ci
, &extra_info
.issued
);
4548 extra_info
.issued
|= __ceph_caps_dirty(ci
);
4549 handle_cap_grant(inode
, session
, cap
,
4550 h
, msg
->middle
, &extra_info
);
4553 case CEPH_CAP_OP_FLUSH_ACK
:
4554 handle_cap_flush_ack(inode
, le64_to_cpu(msg
->hdr
.tid
),
4558 case CEPH_CAP_OP_TRUNC
:
4559 queue_trunc
= handle_cap_trunc(inode
, h
, session
,
4561 spin_unlock(&ci
->i_ceph_lock
);
4563 ceph_queue_vmtruncate(inode
);
4567 spin_unlock(&ci
->i_ceph_lock
);
4568 pr_err_client(cl
, "unknown cap op %d %s\n", op
,
4569 ceph_cap_op_name(op
));
4573 mutex_unlock(&session
->s_mutex
);
4577 ceph_dec_mds_stopping_blocker(mdsc
);
4579 ceph_put_string(extra_info
.pool_ns
);
4581 /* Defer closing the sessions after s_mutex lock being released */
4583 ceph_mdsc_close_sessions(mdsc
);
4585 kfree(extra_info
.fscrypt_auth
);
4590 * send any cap release message to try to move things
4591 * along for the mds (who clearly thinks we still have this
4594 if (do_cap_release
) {
4595 cap
= ceph_get_cap(mdsc
, NULL
);
4596 cap
->cap_ino
= vino
.ino
;
4597 cap
->queue_release
= 1;
4598 cap
->cap_id
= le64_to_cpu(h
->cap_id
);
4601 cap
->issue_seq
= seq
;
4602 spin_lock(&session
->s_cap_lock
);
4603 __ceph_queue_cap_release(session
, cap
);
4604 spin_unlock(&session
->s_cap_lock
);
4606 ceph_flush_session_cap_releases(mdsc
, session
);
4610 pr_err_client(cl
, "corrupt message\n");
4616 * Delayed work handler to process end of delayed cap release LRU list.
4618 * If new caps are added to the list while processing it, these won't get
4619 * processed in this run. In this case, the ci->i_hold_caps_max will be
4620 * returned so that the work can be scheduled accordingly.
4622 unsigned long ceph_check_delayed_caps(struct ceph_mds_client
*mdsc
)
4624 struct ceph_client
*cl
= mdsc
->fsc
->client
;
4625 struct inode
*inode
;
4626 struct ceph_inode_info
*ci
;
4627 struct ceph_mount_options
*opt
= mdsc
->fsc
->mount_options
;
4628 unsigned long delay_max
= opt
->caps_wanted_delay_max
* HZ
;
4629 unsigned long loop_start
= jiffies
;
4630 unsigned long delay
= 0;
4632 doutc(cl
, "begin\n");
4633 spin_lock(&mdsc
->cap_delay_lock
);
4634 while (!list_empty(&mdsc
->cap_delay_list
)) {
4635 ci
= list_first_entry(&mdsc
->cap_delay_list
,
4636 struct ceph_inode_info
,
4638 if (time_before(loop_start
, ci
->i_hold_caps_max
- delay_max
)) {
4639 doutc(cl
, "caps added recently. Exiting loop");
4640 delay
= ci
->i_hold_caps_max
;
4643 if ((ci
->i_ceph_flags
& CEPH_I_FLUSH
) == 0 &&
4644 time_before(jiffies
, ci
->i_hold_caps_max
))
4646 list_del_init(&ci
->i_cap_delay_list
);
4648 inode
= igrab(&ci
->netfs
.inode
);
4650 spin_unlock(&mdsc
->cap_delay_lock
);
4651 doutc(cl
, "on %p %llx.%llx\n", inode
,
4653 ceph_check_caps(ci
, 0);
4655 spin_lock(&mdsc
->cap_delay_lock
);
4659 * Make sure too many dirty caps or general
4660 * slowness doesn't block mdsc delayed work,
4661 * preventing send_renew_caps() from running.
4663 if (time_after_eq(jiffies
, loop_start
+ 5 * HZ
))
4666 spin_unlock(&mdsc
->cap_delay_lock
);
4667 doutc(cl
, "done\n");
4673 * Flush all dirty caps to the mds
4675 static void flush_dirty_session_caps(struct ceph_mds_session
*s
)
4677 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4678 struct ceph_client
*cl
= mdsc
->fsc
->client
;
4679 struct ceph_inode_info
*ci
;
4680 struct inode
*inode
;
4682 doutc(cl
, "begin\n");
4683 spin_lock(&mdsc
->cap_dirty_lock
);
4684 while (!list_empty(&s
->s_cap_dirty
)) {
4685 ci
= list_first_entry(&s
->s_cap_dirty
, struct ceph_inode_info
,
4687 inode
= &ci
->netfs
.inode
;
4689 doutc(cl
, "%p %llx.%llx\n", inode
, ceph_vinop(inode
));
4690 spin_unlock(&mdsc
->cap_dirty_lock
);
4691 ceph_wait_on_async_create(inode
);
4692 ceph_check_caps(ci
, CHECK_CAPS_FLUSH
);
4694 spin_lock(&mdsc
->cap_dirty_lock
);
4696 spin_unlock(&mdsc
->cap_dirty_lock
);
4697 doutc(cl
, "done\n");
4700 void ceph_flush_dirty_caps(struct ceph_mds_client
*mdsc
)
4702 ceph_mdsc_iterate_sessions(mdsc
, flush_dirty_session_caps
, true);
4706 * Flush all cap releases to the mds
4708 static void flush_cap_releases(struct ceph_mds_session
*s
)
4710 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4711 struct ceph_client
*cl
= mdsc
->fsc
->client
;
4713 doutc(cl
, "begin\n");
4714 spin_lock(&s
->s_cap_lock
);
4715 if (s
->s_num_cap_releases
)
4716 ceph_flush_session_cap_releases(mdsc
, s
);
4717 spin_unlock(&s
->s_cap_lock
);
4718 doutc(cl
, "done\n");
4722 void ceph_flush_cap_releases(struct ceph_mds_client
*mdsc
)
4724 ceph_mdsc_iterate_sessions(mdsc
, flush_cap_releases
, true);
4727 void __ceph_touch_fmode(struct ceph_inode_info
*ci
,
4728 struct ceph_mds_client
*mdsc
, int fmode
)
4730 unsigned long now
= jiffies
;
4731 if (fmode
& CEPH_FILE_MODE_RD
)
4732 ci
->i_last_rd
= now
;
4733 if (fmode
& CEPH_FILE_MODE_WR
)
4734 ci
->i_last_wr
= now
;
4735 /* queue periodic check */
4737 __ceph_is_any_real_caps(ci
) &&
4738 list_empty(&ci
->i_cap_delay_list
))
4739 __cap_delay_requeue(mdsc
, ci
);
4742 void ceph_get_fmode(struct ceph_inode_info
*ci
, int fmode
, int count
)
4744 struct ceph_mds_client
*mdsc
= ceph_sb_to_mdsc(ci
->netfs
.inode
.i_sb
);
4745 int bits
= (fmode
<< 1) | 1;
4746 bool already_opened
= false;
4750 atomic64_inc(&mdsc
->metric
.opened_files
);
4752 spin_lock(&ci
->i_ceph_lock
);
4753 for (i
= 0; i
< CEPH_FILE_MODE_BITS
; i
++) {
4755 * If any of the mode ref is larger than 0,
4756 * that means it has been already opened by
4757 * others. Just skip checking the PIN ref.
4759 if (i
&& ci
->i_nr_by_mode
[i
])
4760 already_opened
= true;
4762 if (bits
& (1 << i
))
4763 ci
->i_nr_by_mode
[i
] += count
;
4766 if (!already_opened
)
4767 percpu_counter_inc(&mdsc
->metric
.opened_inodes
);
4768 spin_unlock(&ci
->i_ceph_lock
);
4772 * Drop open file reference. If we were the last open file,
4773 * we may need to release capabilities to the MDS (or schedule
4774 * their delayed release).
4776 void ceph_put_fmode(struct ceph_inode_info
*ci
, int fmode
, int count
)
4778 struct ceph_mds_client
*mdsc
= ceph_sb_to_mdsc(ci
->netfs
.inode
.i_sb
);
4779 int bits
= (fmode
<< 1) | 1;
4780 bool is_closed
= true;
4784 atomic64_dec(&mdsc
->metric
.opened_files
);
4786 spin_lock(&ci
->i_ceph_lock
);
4787 for (i
= 0; i
< CEPH_FILE_MODE_BITS
; i
++) {
4788 if (bits
& (1 << i
)) {
4789 BUG_ON(ci
->i_nr_by_mode
[i
] < count
);
4790 ci
->i_nr_by_mode
[i
] -= count
;
4794 * If any of the mode ref is not 0 after
4795 * decreased, that means it is still opened
4796 * by others. Just skip checking the PIN ref.
4798 if (i
&& ci
->i_nr_by_mode
[i
])
4803 percpu_counter_dec(&mdsc
->metric
.opened_inodes
);
4804 spin_unlock(&ci
->i_ceph_lock
);
4808 * For a soon-to-be unlinked file, drop the LINK caps. If it
4809 * looks like the link count will hit 0, drop any other caps (other
4810 * than PIN) we don't specifically want (due to the file still being
4813 int ceph_drop_caps_for_unlink(struct inode
*inode
)
4815 struct ceph_inode_info
*ci
= ceph_inode(inode
);
4816 int drop
= CEPH_CAP_LINK_SHARED
| CEPH_CAP_LINK_EXCL
;
4818 spin_lock(&ci
->i_ceph_lock
);
4819 if (inode
->i_nlink
== 1) {
4820 drop
|= ~(__ceph_caps_wanted(ci
) | CEPH_CAP_PIN
);
4822 if (__ceph_caps_dirty(ci
)) {
4823 struct ceph_mds_client
*mdsc
=
4824 ceph_inode_to_fs_client(inode
)->mdsc
;
4826 doutc(mdsc
->fsc
->client
, "%p %llx.%llx\n", inode
,
4828 spin_lock(&mdsc
->cap_delay_lock
);
4829 ci
->i_ceph_flags
|= CEPH_I_FLUSH
;
4830 if (!list_empty(&ci
->i_cap_delay_list
))
4831 list_del_init(&ci
->i_cap_delay_list
);
4832 list_add_tail(&ci
->i_cap_delay_list
,
4833 &mdsc
->cap_unlink_delay_list
);
4834 spin_unlock(&mdsc
->cap_delay_lock
);
4837 * Fire the work immediately, because the MDS maybe
4838 * waiting for caps release.
4840 ceph_queue_cap_unlink_work(mdsc
);
4843 spin_unlock(&ci
->i_ceph_lock
);
4848 * Helpers for embedding cap and dentry lease releases into mds
4851 * @force is used by dentry_release (below) to force inclusion of a
4852 * record for the directory inode, even when there aren't any caps to
4855 int ceph_encode_inode_release(void **p
, struct inode
*inode
,
4856 int mds
, int drop
, int unless
, int force
)
4858 struct ceph_inode_info
*ci
= ceph_inode(inode
);
4859 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
4860 struct ceph_cap
*cap
;
4861 struct ceph_mds_request_release
*rel
= *p
;
4865 spin_lock(&ci
->i_ceph_lock
);
4866 used
= __ceph_caps_used(ci
);
4867 dirty
= __ceph_caps_dirty(ci
);
4869 doutc(cl
, "%p %llx.%llx mds%d used|dirty %s drop %s unless %s\n",
4870 inode
, ceph_vinop(inode
), mds
, ceph_cap_string(used
|dirty
),
4871 ceph_cap_string(drop
), ceph_cap_string(unless
));
4873 /* only drop unused, clean caps */
4874 drop
&= ~(used
| dirty
);
4876 cap
= __get_cap_for_mds(ci
, mds
);
4877 if (cap
&& __cap_is_valid(cap
)) {
4878 unless
&= cap
->issued
;
4880 if (unless
& CEPH_CAP_AUTH_EXCL
)
4881 drop
&= ~CEPH_CAP_AUTH_SHARED
;
4882 if (unless
& CEPH_CAP_LINK_EXCL
)
4883 drop
&= ~CEPH_CAP_LINK_SHARED
;
4884 if (unless
& CEPH_CAP_XATTR_EXCL
)
4885 drop
&= ~CEPH_CAP_XATTR_SHARED
;
4886 if (unless
& CEPH_CAP_FILE_EXCL
)
4887 drop
&= ~CEPH_CAP_FILE_SHARED
;
4890 if (force
|| (cap
->issued
& drop
)) {
4891 if (cap
->issued
& drop
) {
4892 int wanted
= __ceph_caps_wanted(ci
);
4893 doutc(cl
, "%p %llx.%llx cap %p %s -> %s, "
4894 "wanted %s -> %s\n", inode
,
4895 ceph_vinop(inode
), cap
,
4896 ceph_cap_string(cap
->issued
),
4897 ceph_cap_string(cap
->issued
& ~drop
),
4898 ceph_cap_string(cap
->mds_wanted
),
4899 ceph_cap_string(wanted
));
4901 cap
->issued
&= ~drop
;
4902 cap
->implemented
&= ~drop
;
4903 cap
->mds_wanted
= wanted
;
4904 if (cap
== ci
->i_auth_cap
&&
4905 !(wanted
& CEPH_CAP_ANY_FILE_WR
))
4906 ci
->i_requested_max_size
= 0;
4908 doutc(cl
, "%p %llx.%llx cap %p %s (force)\n",
4909 inode
, ceph_vinop(inode
), cap
,
4910 ceph_cap_string(cap
->issued
));
4913 rel
->ino
= cpu_to_le64(ceph_ino(inode
));
4914 rel
->cap_id
= cpu_to_le64(cap
->cap_id
);
4915 rel
->seq
= cpu_to_le32(cap
->seq
);
4916 rel
->issue_seq
= cpu_to_le32(cap
->issue_seq
);
4917 rel
->mseq
= cpu_to_le32(cap
->mseq
);
4918 rel
->caps
= cpu_to_le32(cap
->implemented
);
4919 rel
->wanted
= cpu_to_le32(cap
->mds_wanted
);
4925 doutc(cl
, "%p %llx.%llx cap %p %s (noop)\n",
4926 inode
, ceph_vinop(inode
), cap
,
4927 ceph_cap_string(cap
->issued
));
4930 spin_unlock(&ci
->i_ceph_lock
);
4935 * ceph_encode_dentry_release - encode a dentry release into an outgoing request
4936 * @p: outgoing request buffer
4937 * @dentry: dentry to release
4938 * @dir: dir to release it from
4939 * @mds: mds that we're speaking to
4940 * @drop: caps being dropped
4941 * @unless: unless we have these caps
4943 * Encode a dentry release into an outgoing request buffer. Returns 1 if the
4944 * thing was released, or a negative error code otherwise.
4946 int ceph_encode_dentry_release(void **p
, struct dentry
*dentry
,
4948 int mds
, int drop
, int unless
)
4950 struct ceph_mds_request_release
*rel
= *p
;
4951 struct ceph_dentry_info
*di
= ceph_dentry(dentry
);
4952 struct ceph_client
*cl
;
4956 /* This shouldn't happen */
4960 * force an record for the directory caps if we have a dentry lease.
4961 * this is racy (can't take i_ceph_lock and d_lock together), but it
4962 * doesn't have to be perfect; the mds will revoke anything we don't
4965 spin_lock(&dentry
->d_lock
);
4966 if (di
->lease_session
&& di
->lease_session
->s_mds
== mds
)
4968 spin_unlock(&dentry
->d_lock
);
4970 ret
= ceph_encode_inode_release(p
, dir
, mds
, drop
, unless
, force
);
4972 cl
= ceph_inode_to_client(dir
);
4973 spin_lock(&dentry
->d_lock
);
4974 if (ret
&& di
->lease_session
&& di
->lease_session
->s_mds
== mds
) {
4975 doutc(cl
, "%p mds%d seq %d\n", dentry
, mds
,
4976 (int)di
->lease_seq
);
4977 rel
->dname_seq
= cpu_to_le32(di
->lease_seq
);
4978 __ceph_mdsc_drop_dentry_lease(dentry
);
4979 spin_unlock(&dentry
->d_lock
);
4980 if (IS_ENCRYPTED(dir
) && fscrypt_has_encryption_key(dir
)) {
4981 int ret2
= ceph_encode_encrypted_fname(dir
, dentry
, *p
);
4986 rel
->dname_len
= cpu_to_le32(ret2
);
4989 rel
->dname_len
= cpu_to_le32(dentry
->d_name
.len
);
4990 memcpy(*p
, dentry
->d_name
.name
, dentry
->d_name
.len
);
4991 *p
+= dentry
->d_name
.len
;
4994 spin_unlock(&dentry
->d_lock
);
4999 static int remove_capsnaps(struct ceph_mds_client
*mdsc
, struct inode
*inode
)
5001 struct ceph_inode_info
*ci
= ceph_inode(inode
);
5002 struct ceph_client
*cl
= mdsc
->fsc
->client
;
5003 struct ceph_cap_snap
*capsnap
;
5004 int capsnap_release
= 0;
5006 lockdep_assert_held(&ci
->i_ceph_lock
);
5008 doutc(cl
, "removing capsnaps, ci is %p, %p %llx.%llx\n",
5009 ci
, inode
, ceph_vinop(inode
));
5011 while (!list_empty(&ci
->i_cap_snaps
)) {
5012 capsnap
= list_first_entry(&ci
->i_cap_snaps
,
5013 struct ceph_cap_snap
, ci_item
);
5014 __ceph_remove_capsnap(inode
, capsnap
, NULL
, NULL
);
5015 ceph_put_snap_context(capsnap
->context
);
5016 ceph_put_cap_snap(capsnap
);
5019 wake_up_all(&ci
->i_cap_wq
);
5020 wake_up_all(&mdsc
->cap_flushing_wq
);
5021 return capsnap_release
;
5024 int ceph_purge_inode_cap(struct inode
*inode
, struct ceph_cap
*cap
, bool *invalidate
)
5026 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
5027 struct ceph_mds_client
*mdsc
= fsc
->mdsc
;
5028 struct ceph_client
*cl
= fsc
->client
;
5029 struct ceph_inode_info
*ci
= ceph_inode(inode
);
5031 bool dirty_dropped
= false;
5034 lockdep_assert_held(&ci
->i_ceph_lock
);
5036 doutc(cl
, "removing cap %p, ci is %p, %p %llx.%llx\n",
5037 cap
, ci
, inode
, ceph_vinop(inode
));
5039 is_auth
= (cap
== ci
->i_auth_cap
);
5040 __ceph_remove_cap(cap
, false);
5042 struct ceph_cap_flush
*cf
;
5044 if (ceph_inode_is_shutdown(inode
)) {
5045 if (inode
->i_data
.nrpages
> 0)
5047 if (ci
->i_wrbuffer_ref
> 0)
5048 mapping_set_error(&inode
->i_data
, -EIO
);
5051 spin_lock(&mdsc
->cap_dirty_lock
);
5053 /* trash all of the cap flushes for this inode */
5054 while (!list_empty(&ci
->i_cap_flush_list
)) {
5055 cf
= list_first_entry(&ci
->i_cap_flush_list
,
5056 struct ceph_cap_flush
, i_list
);
5057 list_del_init(&cf
->g_list
);
5058 list_del_init(&cf
->i_list
);
5059 if (!cf
->is_capsnap
)
5060 ceph_free_cap_flush(cf
);
5063 if (!list_empty(&ci
->i_dirty_item
)) {
5064 pr_warn_ratelimited_client(cl
,
5065 " dropping dirty %s state for %p %llx.%llx\n",
5066 ceph_cap_string(ci
->i_dirty_caps
),
5067 inode
, ceph_vinop(inode
));
5068 ci
->i_dirty_caps
= 0;
5069 list_del_init(&ci
->i_dirty_item
);
5070 dirty_dropped
= true;
5072 if (!list_empty(&ci
->i_flushing_item
)) {
5073 pr_warn_ratelimited_client(cl
,
5074 " dropping dirty+flushing %s state for %p %llx.%llx\n",
5075 ceph_cap_string(ci
->i_flushing_caps
),
5076 inode
, ceph_vinop(inode
));
5077 ci
->i_flushing_caps
= 0;
5078 list_del_init(&ci
->i_flushing_item
);
5079 mdsc
->num_cap_flushing
--;
5080 dirty_dropped
= true;
5082 spin_unlock(&mdsc
->cap_dirty_lock
);
5084 if (dirty_dropped
) {
5085 mapping_set_error(inode
->i_mapping
, -EIO
);
5087 if (ci
->i_wrbuffer_ref_head
== 0 &&
5088 ci
->i_wr_ref
== 0 &&
5089 ci
->i_dirty_caps
== 0 &&
5090 ci
->i_flushing_caps
== 0) {
5091 ceph_put_snap_context(ci
->i_head_snapc
);
5092 ci
->i_head_snapc
= NULL
;
5096 if (atomic_read(&ci
->i_filelock_ref
) > 0) {
5097 /* make further file lock syscall return -EIO */
5098 ci
->i_ceph_flags
|= CEPH_I_ERROR_FILELOCK
;
5099 pr_warn_ratelimited_client(cl
,
5100 " dropping file locks for %p %llx.%llx\n",
5101 inode
, ceph_vinop(inode
));
5104 if (!ci
->i_dirty_caps
&& ci
->i_prealloc_cap_flush
) {
5105 cf
= ci
->i_prealloc_cap_flush
;
5106 ci
->i_prealloc_cap_flush
= NULL
;
5107 if (!cf
->is_capsnap
)
5108 ceph_free_cap_flush(cf
);
5111 if (!list_empty(&ci
->i_cap_snaps
))
5112 iputs
= remove_capsnaps(mdsc
, inode
);