1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
4 #include <linux/backing-dev.h>
7 #include <linux/swap.h>
8 #include <linux/pagemap.h>
9 #include <linux/slab.h>
10 #include <linux/pagevec.h>
11 #include <linux/task_io_accounting_ops.h>
12 #include <linux/signal.h>
13 #include <linux/iversion.h>
14 #include <linux/ktime.h>
15 #include <linux/netfs.h>
16 #include <trace/events/netfs.h>
19 #include "mds_client.h"
23 #include <linux/ceph/osd_client.h>
24 #include <linux/ceph/striper.h>
27 * Ceph address space ops.
29 * There are a few funny things going on here.
31 * The page->private field is used to reference a struct
32 * ceph_snap_context for _every_ dirty page. This indicates which
33 * snapshot the page was logically dirtied in, and thus which snap
34 * context needs to be associated with the osd write during writeback.
36 * Similarly, struct ceph_inode_info maintains a set of counters to
37 * count dirty pages on the inode. In the absence of snapshots,
38 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
40 * When a snapshot is taken (that is, when the client receives
41 * notification that a snapshot was taken), each inode with caps and
42 * with dirty pages (dirty pages implies there is a cap) gets a new
43 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
44 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is
45 * moved to capsnap->dirty. (Unless a sync write is currently in
46 * progress. In that case, the capsnap is said to be "pending", new
47 * writes cannot start, and the capsnap isn't "finalized" until the
48 * write completes (or fails) and a final size/mtime for the inode for
49 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0.
51 * On writeback, we must submit writes to the osd IN SNAP ORDER. So,
52 * we look for the first capsnap in i_cap_snaps and write out pages in
53 * that snap context _only_. Then we move on to the next capsnap,
54 * eventually reaching the "live" or "head" context (i.e., pages that
55 * are not yet snapped) and are writing the most recently dirtied
58 * Invalidate and so forth must take care to ensure the dirty page
59 * accounting is preserved.
62 #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
63 #define CONGESTION_OFF_THRESH(congestion_kb) \
64 (CONGESTION_ON_THRESH(congestion_kb) - \
65 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
67 static int ceph_netfs_check_write_begin(struct file
*file
, loff_t pos
, unsigned int len
,
68 struct folio
**foliop
, void **_fsdata
);
70 static inline struct ceph_snap_context
*page_snap_context(struct page
*page
)
72 if (PagePrivate(page
))
73 return (void *)page
->private;
78 * Dirty a page. Optimistically adjust accounting, on the assumption
79 * that we won't race with invalidate. If we do, readjust.
81 static bool ceph_dirty_folio(struct address_space
*mapping
, struct folio
*folio
)
83 struct inode
*inode
= mapping
->host
;
84 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
85 struct ceph_inode_info
*ci
;
86 struct ceph_snap_context
*snapc
;
88 if (folio_test_dirty(folio
)) {
89 doutc(cl
, "%llx.%llx %p idx %lu -- already dirty\n",
90 ceph_vinop(inode
), folio
, folio
->index
);
91 VM_BUG_ON_FOLIO(!folio_test_private(folio
), folio
);
95 ci
= ceph_inode(inode
);
98 spin_lock(&ci
->i_ceph_lock
);
99 if (__ceph_have_pending_cap_snap(ci
)) {
100 struct ceph_cap_snap
*capsnap
=
101 list_last_entry(&ci
->i_cap_snaps
,
102 struct ceph_cap_snap
,
104 snapc
= ceph_get_snap_context(capsnap
->context
);
105 capsnap
->dirty_pages
++;
107 BUG_ON(!ci
->i_head_snapc
);
108 snapc
= ceph_get_snap_context(ci
->i_head_snapc
);
109 ++ci
->i_wrbuffer_ref_head
;
111 if (ci
->i_wrbuffer_ref
== 0)
113 ++ci
->i_wrbuffer_ref
;
114 doutc(cl
, "%llx.%llx %p idx %lu head %d/%d -> %d/%d "
115 "snapc %p seq %lld (%d snaps)\n",
116 ceph_vinop(inode
), folio
, folio
->index
,
117 ci
->i_wrbuffer_ref
-1, ci
->i_wrbuffer_ref_head
-1,
118 ci
->i_wrbuffer_ref
, ci
->i_wrbuffer_ref_head
,
119 snapc
, snapc
->seq
, snapc
->num_snaps
);
120 spin_unlock(&ci
->i_ceph_lock
);
123 * Reference snap context in folio->private. Also set
124 * PagePrivate so that we get invalidate_folio callback.
126 VM_WARN_ON_FOLIO(folio
->private, folio
);
127 folio_attach_private(folio
, snapc
);
129 return ceph_fscache_dirty_folio(mapping
, folio
);
133 * If we are truncating the full folio (i.e. offset == 0), adjust the
134 * dirty folio counters appropriately. Only called if there is private
137 static void ceph_invalidate_folio(struct folio
*folio
, size_t offset
,
140 struct inode
*inode
= folio
->mapping
->host
;
141 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
142 struct ceph_inode_info
*ci
= ceph_inode(inode
);
143 struct ceph_snap_context
*snapc
;
146 if (offset
!= 0 || length
!= folio_size(folio
)) {
147 doutc(cl
, "%llx.%llx idx %lu partial dirty page %zu~%zu\n",
148 ceph_vinop(inode
), folio
->index
, offset
, length
);
152 WARN_ON(!folio_test_locked(folio
));
153 if (folio_test_private(folio
)) {
154 doutc(cl
, "%llx.%llx idx %lu full dirty page\n",
155 ceph_vinop(inode
), folio
->index
);
157 snapc
= folio_detach_private(folio
);
158 ceph_put_wrbuffer_cap_refs(ci
, 1, snapc
);
159 ceph_put_snap_context(snapc
);
162 netfs_invalidate_folio(folio
, offset
, length
);
165 static void ceph_netfs_expand_readahead(struct netfs_io_request
*rreq
)
167 struct inode
*inode
= rreq
->inode
;
168 struct ceph_inode_info
*ci
= ceph_inode(inode
);
169 struct ceph_file_layout
*lo
= &ci
->i_layout
;
170 unsigned long max_pages
= inode
->i_sb
->s_bdi
->ra_pages
;
171 loff_t end
= rreq
->start
+ rreq
->len
, new_end
;
172 struct ceph_netfs_request_data
*priv
= rreq
->netfs_priv
;
173 unsigned long max_len
;
177 /* Readahead is disabled by posix_fadvise POSIX_FADV_RANDOM */
178 if (priv
->file_ra_disabled
)
181 max_pages
= priv
->file_ra_pages
;
185 /* Readahead is disabled */
189 max_len
= max_pages
<< PAGE_SHIFT
;
192 * Try to expand the length forward by rounding up it to the next
193 * block, but do not exceed the file size, unless the original
194 * request already exceeds it.
196 new_end
= umin(round_up(end
, lo
->stripe_unit
), rreq
->i_size
);
197 if (new_end
> end
&& new_end
<= rreq
->start
+ max_len
)
198 rreq
->len
= new_end
- rreq
->start
;
200 /* Try to expand the start downward */
201 div_u64_rem(rreq
->start
, lo
->stripe_unit
, &blockoff
);
202 if (rreq
->len
+ blockoff
<= max_len
) {
203 rreq
->start
-= blockoff
;
204 rreq
->len
+= blockoff
;
208 static void finish_netfs_read(struct ceph_osd_request
*req
)
210 struct inode
*inode
= req
->r_inode
;
211 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
212 struct ceph_client
*cl
= fsc
->client
;
213 struct ceph_osd_data
*osd_data
= osd_req_op_extent_osd_data(req
, 0);
214 struct netfs_io_subrequest
*subreq
= req
->r_priv
;
215 struct ceph_osd_req_op
*op
= &req
->r_ops
[0];
216 int err
= req
->r_result
;
217 bool sparse
= (op
->op
== CEPH_OSD_OP_SPARSE_READ
);
219 ceph_update_read_metrics(&fsc
->mdsc
->metric
, req
->r_start_latency
,
220 req
->r_end_latency
, osd_data
->length
, err
);
222 doutc(cl
, "result %d subreq->len=%zu i_size=%lld\n", req
->r_result
,
223 subreq
->len
, i_size_read(req
->r_inode
));
225 /* no object means success but no data */
228 else if (err
== -EBLOCKLISTED
)
229 fsc
->blocklisted
= true;
232 if (sparse
&& err
> 0)
233 err
= ceph_sparse_ext_map_end(op
);
234 if (err
< subreq
->len
&&
235 subreq
->rreq
->origin
!= NETFS_DIO_READ
)
236 __set_bit(NETFS_SREQ_CLEAR_TAIL
, &subreq
->flags
);
237 if (IS_ENCRYPTED(inode
) && err
> 0) {
238 err
= ceph_fscrypt_decrypt_extents(inode
,
239 osd_data
->pages
, subreq
->start
,
240 op
->extent
.sparse_ext
,
241 op
->extent
.sparse_ext_cnt
);
242 if (err
> subreq
->len
)
247 if (osd_data
->type
== CEPH_OSD_DATA_TYPE_PAGES
) {
248 ceph_put_page_vector(osd_data
->pages
,
249 calc_pages_for(osd_data
->alignment
,
250 osd_data
->length
), false);
253 subreq
->transferred
= err
;
256 trace_netfs_sreq(subreq
, netfs_sreq_trace_io_progress
);
257 netfs_read_subreq_terminated(subreq
, err
, false);
259 ceph_dec_osd_stopping_blocker(fsc
->mdsc
);
262 static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest
*subreq
)
264 struct netfs_io_request
*rreq
= subreq
->rreq
;
265 struct inode
*inode
= rreq
->inode
;
266 struct ceph_mds_reply_info_parsed
*rinfo
;
267 struct ceph_mds_reply_info_in
*iinfo
;
268 struct ceph_mds_request
*req
;
269 struct ceph_mds_client
*mdsc
= ceph_sb_to_mdsc(inode
->i_sb
);
270 struct ceph_inode_info
*ci
= ceph_inode(inode
);
275 if (rreq
->origin
!= NETFS_DIO_READ
)
276 __set_bit(NETFS_SREQ_CLEAR_TAIL
, &subreq
->flags
);
277 __clear_bit(NETFS_SREQ_COPY_TO_CACHE
, &subreq
->flags
);
279 if (subreq
->start
>= inode
->i_size
)
282 /* We need to fetch the inline data. */
283 mode
= ceph_try_to_choose_auth_mds(inode
, CEPH_STAT_CAP_INLINE_DATA
);
284 req
= ceph_mdsc_create_request(mdsc
, CEPH_MDS_OP_GETATTR
, mode
);
289 req
->r_ino1
= ci
->i_vino
;
290 req
->r_args
.getattr
.mask
= cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA
);
293 trace_netfs_sreq(subreq
, netfs_sreq_trace_submit
);
294 err
= ceph_mdsc_do_request(mdsc
, NULL
, req
);
298 rinfo
= &req
->r_reply_info
;
299 iinfo
= &rinfo
->targeti
;
300 if (iinfo
->inline_version
== CEPH_INLINE_NONE
) {
301 /* The data got uninlined */
302 ceph_mdsc_put_request(req
);
306 len
= min_t(size_t, iinfo
->inline_len
- subreq
->start
, subreq
->len
);
307 err
= copy_to_iter(iinfo
->inline_data
+ subreq
->start
, len
, &subreq
->io_iter
);
311 subreq
->transferred
+= err
;
315 ceph_mdsc_put_request(req
);
317 netfs_read_subreq_terminated(subreq
, err
, false);
321 static int ceph_netfs_prepare_read(struct netfs_io_subrequest
*subreq
)
323 struct netfs_io_request
*rreq
= subreq
->rreq
;
324 struct inode
*inode
= rreq
->inode
;
325 struct ceph_inode_info
*ci
= ceph_inode(inode
);
326 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
330 /* Truncate the extent at the end of the current block */
331 ceph_calc_file_object_mapping(&ci
->i_layout
, subreq
->start
, subreq
->len
,
332 &objno
, &objoff
, &xlen
);
333 rreq
->io_streams
[0].sreq_max_len
= umin(xlen
, fsc
->mount_options
->rsize
);
337 static void ceph_netfs_issue_read(struct netfs_io_subrequest
*subreq
)
339 struct netfs_io_request
*rreq
= subreq
->rreq
;
340 struct inode
*inode
= rreq
->inode
;
341 struct ceph_inode_info
*ci
= ceph_inode(inode
);
342 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
343 struct ceph_client
*cl
= fsc
->client
;
344 struct ceph_osd_request
*req
= NULL
;
345 struct ceph_vino vino
= ceph_vino(inode
);
348 bool sparse
= IS_ENCRYPTED(inode
) || ceph_test_mount_opt(fsc
, SPARSEREAD
);
349 u64 off
= subreq
->start
;
352 if (ceph_inode_is_shutdown(inode
)) {
357 if (ceph_has_inline_data(ci
) && ceph_netfs_issue_op_inline(subreq
))
360 // TODO: This rounding here is slightly dodgy. It *should* work, for
361 // now, as the cache only deals in blocks that are a multiple of
362 // PAGE_SIZE and fscrypt blocks are at most PAGE_SIZE. What needs to
363 // happen is for the fscrypt driving to be moved into netfslib and the
364 // data in the cache also to be stored encrypted.
366 ceph_fscrypt_adjust_off_and_len(inode
, &off
, &len
);
368 req
= ceph_osdc_new_request(&fsc
->client
->osdc
, &ci
->i_layout
, vino
,
369 off
, &len
, 0, 1, sparse
? CEPH_OSD_OP_SPARSE_READ
: CEPH_OSD_OP_READ
,
370 CEPH_OSD_FLAG_READ
, NULL
, ci
->i_truncate_seq
,
371 ci
->i_truncate_size
, false);
379 extent_cnt
= __ceph_sparse_read_ext_count(inode
, len
);
380 err
= ceph_alloc_sparse_ext_map(&req
->r_ops
[0], extent_cnt
);
385 doutc(cl
, "%llx.%llx pos=%llu orig_len=%zu len=%llu\n",
386 ceph_vinop(inode
), subreq
->start
, subreq
->len
, len
);
389 * FIXME: For now, use CEPH_OSD_DATA_TYPE_PAGES instead of _ITER for
390 * encrypted inodes. We'd need infrastructure that handles an iov_iter
391 * instead of page arrays, and we don't have that as of yet. Once the
392 * dust settles on the write helpers and encrypt/decrypt routines for
393 * netfs, we should be able to rework this.
395 if (IS_ENCRYPTED(inode
)) {
399 err
= iov_iter_get_pages_alloc2(&subreq
->io_iter
, &pages
, len
, &page_off
);
401 doutc(cl
, "%llx.%llx failed to allocate pages, %d\n",
402 ceph_vinop(inode
), err
);
406 /* should always give us a page-aligned read */
407 WARN_ON_ONCE(page_off
);
411 osd_req_op_extent_osd_data_pages(req
, 0, pages
, len
, 0, false,
414 osd_req_op_extent_osd_iter(req
, 0, &subreq
->io_iter
);
416 if (!ceph_inc_osd_stopping_blocker(fsc
->mdsc
)) {
420 req
->r_callback
= finish_netfs_read
;
421 req
->r_priv
= subreq
;
422 req
->r_inode
= inode
;
425 trace_netfs_sreq(subreq
, netfs_sreq_trace_submit
);
426 ceph_osdc_start_request(req
->r_osdc
, req
);
428 ceph_osdc_put_request(req
);
430 netfs_read_subreq_terminated(subreq
, err
, false);
431 doutc(cl
, "%llx.%llx result %d\n", ceph_vinop(inode
), err
);
434 static int ceph_init_request(struct netfs_io_request
*rreq
, struct file
*file
)
436 struct inode
*inode
= rreq
->inode
;
437 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
438 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
439 int got
= 0, want
= CEPH_CAP_FILE_CACHE
;
440 struct ceph_netfs_request_data
*priv
;
443 /* [DEPRECATED] Use PG_private_2 to mark folio being written to the cache. */
444 __set_bit(NETFS_RREQ_USE_PGPRIV2
, &rreq
->flags
);
446 if (rreq
->origin
!= NETFS_READAHEAD
)
449 priv
= kzalloc(sizeof(*priv
), GFP_NOFS
);
454 struct ceph_rw_context
*rw_ctx
;
455 struct ceph_file_info
*fi
= file
->private_data
;
457 priv
->file_ra_pages
= file
->f_ra
.ra_pages
;
458 priv
->file_ra_disabled
= file
->f_mode
& FMODE_RANDOM
;
460 rw_ctx
= ceph_find_rw_context(fi
);
462 rreq
->netfs_priv
= priv
;
468 * readahead callers do not necessarily hold Fcb caps
469 * (e.g. fadvise, madvise).
471 ret
= ceph_try_get_caps(inode
, CEPH_CAP_FILE_RD
, want
, true, &got
);
473 doutc(cl
, "%llx.%llx, error getting cap\n", ceph_vinop(inode
));
478 doutc(cl
, "%llx.%llx, no cache cap\n", ceph_vinop(inode
));
488 rreq
->netfs_priv
= priv
;
489 rreq
->io_streams
[0].sreq_max_len
= fsc
->mount_options
->rsize
;
494 ceph_put_cap_refs(ceph_inode(inode
), got
);
501 static void ceph_netfs_free_request(struct netfs_io_request
*rreq
)
503 struct ceph_netfs_request_data
*priv
= rreq
->netfs_priv
;
509 ceph_put_cap_refs(ceph_inode(rreq
->inode
), priv
->caps
);
511 rreq
->netfs_priv
= NULL
;
514 const struct netfs_request_ops ceph_netfs_ops
= {
515 .init_request
= ceph_init_request
,
516 .free_request
= ceph_netfs_free_request
,
517 .prepare_read
= ceph_netfs_prepare_read
,
518 .issue_read
= ceph_netfs_issue_read
,
519 .expand_readahead
= ceph_netfs_expand_readahead
,
520 .check_write_begin
= ceph_netfs_check_write_begin
,
523 #ifdef CONFIG_CEPH_FSCACHE
524 static void ceph_set_page_fscache(struct page
*page
)
526 folio_start_private_2(page_folio(page
)); /* [DEPRECATED] */
529 static void ceph_fscache_write_terminated(void *priv
, ssize_t error
, bool was_async
)
531 struct inode
*inode
= priv
;
533 if (IS_ERR_VALUE(error
) && error
!= -ENOBUFS
)
534 ceph_fscache_invalidate(inode
, false);
537 static void ceph_fscache_write_to_cache(struct inode
*inode
, u64 off
, u64 len
, bool caching
)
539 struct ceph_inode_info
*ci
= ceph_inode(inode
);
540 struct fscache_cookie
*cookie
= ceph_fscache_cookie(ci
);
542 fscache_write_to_cache(cookie
, inode
->i_mapping
, off
, len
, i_size_read(inode
),
543 ceph_fscache_write_terminated
, inode
, true, caching
);
546 static inline void ceph_set_page_fscache(struct page
*page
)
550 static inline void ceph_fscache_write_to_cache(struct inode
*inode
, u64 off
, u64 len
, bool caching
)
553 #endif /* CONFIG_CEPH_FSCACHE */
555 struct ceph_writeback_ctl
565 * Get ref for the oldest snapc for an inode with dirty data... that is, the
566 * only snap context we are allowed to write back.
568 static struct ceph_snap_context
*
569 get_oldest_context(struct inode
*inode
, struct ceph_writeback_ctl
*ctl
,
570 struct ceph_snap_context
*page_snapc
)
572 struct ceph_inode_info
*ci
= ceph_inode(inode
);
573 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
574 struct ceph_snap_context
*snapc
= NULL
;
575 struct ceph_cap_snap
*capsnap
= NULL
;
577 spin_lock(&ci
->i_ceph_lock
);
578 list_for_each_entry(capsnap
, &ci
->i_cap_snaps
, ci_item
) {
579 doutc(cl
, " capsnap %p snapc %p has %d dirty pages\n",
580 capsnap
, capsnap
->context
, capsnap
->dirty_pages
);
581 if (!capsnap
->dirty_pages
)
584 /* get i_size, truncate_{seq,size} for page_snapc? */
585 if (snapc
&& capsnap
->context
!= page_snapc
)
589 if (capsnap
->writing
) {
590 ctl
->i_size
= i_size_read(inode
);
591 ctl
->size_stable
= false;
593 ctl
->i_size
= capsnap
->size
;
594 ctl
->size_stable
= true;
596 ctl
->truncate_size
= capsnap
->truncate_size
;
597 ctl
->truncate_seq
= capsnap
->truncate_seq
;
598 ctl
->head_snapc
= false;
604 snapc
= ceph_get_snap_context(capsnap
->context
);
606 page_snapc
== snapc
||
607 page_snapc
->seq
> snapc
->seq
)
610 if (!snapc
&& ci
->i_wrbuffer_ref_head
) {
611 snapc
= ceph_get_snap_context(ci
->i_head_snapc
);
612 doutc(cl
, " head snapc %p has %d dirty pages\n", snapc
,
613 ci
->i_wrbuffer_ref_head
);
615 ctl
->i_size
= i_size_read(inode
);
616 ctl
->truncate_size
= ci
->i_truncate_size
;
617 ctl
->truncate_seq
= ci
->i_truncate_seq
;
618 ctl
->size_stable
= false;
619 ctl
->head_snapc
= true;
622 spin_unlock(&ci
->i_ceph_lock
);
626 static u64
get_writepages_data_length(struct inode
*inode
,
627 struct page
*page
, u64 start
)
629 struct ceph_inode_info
*ci
= ceph_inode(inode
);
630 struct ceph_snap_context
*snapc
;
631 struct ceph_cap_snap
*capsnap
= NULL
;
632 u64 end
= i_size_read(inode
);
635 snapc
= page_snap_context(ceph_fscrypt_pagecache_page(page
));
636 if (snapc
!= ci
->i_head_snapc
) {
638 spin_lock(&ci
->i_ceph_lock
);
639 list_for_each_entry(capsnap
, &ci
->i_cap_snaps
, ci_item
) {
640 if (capsnap
->context
== snapc
) {
641 if (!capsnap
->writing
)
647 spin_unlock(&ci
->i_ceph_lock
);
650 if (end
> ceph_fscrypt_page_offset(page
) + thp_size(page
))
651 end
= ceph_fscrypt_page_offset(page
) + thp_size(page
);
652 ret
= end
> start
? end
- start
: 0;
653 if (ret
&& fscrypt_is_bounce_page(page
))
654 ret
= round_up(ret
, CEPH_FSCRYPT_BLOCK_SIZE
);
659 * Write a single page, but leave the page locked.
661 * If we get a write error, mark the mapping for error, but still adjust the
662 * dirty page accounting (i.e., page is no longer dirty).
664 static int writepage_nounlock(struct page
*page
, struct writeback_control
*wbc
)
666 struct folio
*folio
= page_folio(page
);
667 struct inode
*inode
= page
->mapping
->host
;
668 struct ceph_inode_info
*ci
= ceph_inode(inode
);
669 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
670 struct ceph_client
*cl
= fsc
->client
;
671 struct ceph_snap_context
*snapc
, *oldest
;
672 loff_t page_off
= page_offset(page
);
674 loff_t len
= thp_size(page
);
676 struct ceph_writeback_ctl ceph_wbc
;
677 struct ceph_osd_client
*osdc
= &fsc
->client
->osdc
;
678 struct ceph_osd_request
*req
;
679 bool caching
= ceph_is_cache_enabled(inode
);
680 struct page
*bounce_page
= NULL
;
682 doutc(cl
, "%llx.%llx page %p idx %lu\n", ceph_vinop(inode
), page
,
685 if (ceph_inode_is_shutdown(inode
))
688 /* verify this is a writeable snap context */
689 snapc
= page_snap_context(page
);
691 doutc(cl
, "%llx.%llx page %p not dirty?\n", ceph_vinop(inode
),
695 oldest
= get_oldest_context(inode
, &ceph_wbc
, snapc
);
696 if (snapc
->seq
> oldest
->seq
) {
697 doutc(cl
, "%llx.%llx page %p snapc %p not writeable - noop\n",
698 ceph_vinop(inode
), page
, snapc
);
699 /* we should only noop if called by kswapd */
700 WARN_ON(!(current
->flags
& PF_MEMALLOC
));
701 ceph_put_snap_context(oldest
);
702 redirty_page_for_writepage(wbc
, page
);
705 ceph_put_snap_context(oldest
);
707 /* is this a partial page at end of file? */
708 if (page_off
>= ceph_wbc
.i_size
) {
709 doutc(cl
, "%llx.%llx folio at %lu beyond eof %llu\n",
710 ceph_vinop(inode
), folio
->index
, ceph_wbc
.i_size
);
711 folio_invalidate(folio
, 0, folio_size(folio
));
715 if (ceph_wbc
.i_size
< page_off
+ len
)
716 len
= ceph_wbc
.i_size
- page_off
;
718 wlen
= IS_ENCRYPTED(inode
) ? round_up(len
, CEPH_FSCRYPT_BLOCK_SIZE
) : len
;
719 doutc(cl
, "%llx.%llx page %p index %lu on %llu~%llu snapc %p seq %lld\n",
720 ceph_vinop(inode
), page
, page
->index
, page_off
, wlen
, snapc
,
723 if (atomic_long_inc_return(&fsc
->writeback_count
) >
724 CONGESTION_ON_THRESH(fsc
->mount_options
->congestion_kb
))
725 fsc
->write_congested
= true;
727 req
= ceph_osdc_new_request(osdc
, &ci
->i_layout
, ceph_vino(inode
),
728 page_off
, &wlen
, 0, 1, CEPH_OSD_OP_WRITE
,
729 CEPH_OSD_FLAG_WRITE
, snapc
,
730 ceph_wbc
.truncate_seq
,
731 ceph_wbc
.truncate_size
, true);
733 redirty_page_for_writepage(wbc
, page
);
740 set_page_writeback(page
);
742 ceph_set_page_fscache(page
);
743 ceph_fscache_write_to_cache(inode
, page_off
, len
, caching
);
745 if (IS_ENCRYPTED(inode
)) {
746 bounce_page
= fscrypt_encrypt_pagecache_blocks(page
,
747 CEPH_FSCRYPT_BLOCK_SIZE
, 0,
749 if (IS_ERR(bounce_page
)) {
750 redirty_page_for_writepage(wbc
, page
);
751 end_page_writeback(page
);
752 ceph_osdc_put_request(req
);
753 return PTR_ERR(bounce_page
);
757 /* it may be a short write due to an object boundary */
758 WARN_ON_ONCE(len
> thp_size(page
));
759 osd_req_op_extent_osd_data_pages(req
, 0,
760 bounce_page
? &bounce_page
: &page
, wlen
, 0,
762 doutc(cl
, "%llx.%llx %llu~%llu (%llu bytes, %sencrypted)\n",
763 ceph_vinop(inode
), page_off
, len
, wlen
,
764 IS_ENCRYPTED(inode
) ? "" : "not ");
766 req
->r_mtime
= inode_get_mtime(inode
);
767 ceph_osdc_start_request(osdc
, req
);
768 err
= ceph_osdc_wait_request(osdc
, req
);
770 ceph_update_write_metrics(&fsc
->mdsc
->metric
, req
->r_start_latency
,
771 req
->r_end_latency
, len
, err
);
772 fscrypt_free_bounce_page(bounce_page
);
773 ceph_osdc_put_request(req
);
778 struct writeback_control tmp_wbc
;
781 if (err
== -ERESTARTSYS
) {
782 /* killed by SIGKILL */
783 doutc(cl
, "%llx.%llx interrupted page %p\n",
784 ceph_vinop(inode
), page
);
785 redirty_page_for_writepage(wbc
, page
);
786 end_page_writeback(page
);
789 if (err
== -EBLOCKLISTED
)
790 fsc
->blocklisted
= true;
791 doutc(cl
, "%llx.%llx setting page/mapping error %d %p\n",
792 ceph_vinop(inode
), err
, page
);
793 mapping_set_error(&inode
->i_data
, err
);
794 wbc
->pages_skipped
++;
796 doutc(cl
, "%llx.%llx cleaned page %p\n",
797 ceph_vinop(inode
), page
);
798 err
= 0; /* vfs expects us to return 0 */
800 oldest
= detach_page_private(page
);
801 WARN_ON_ONCE(oldest
!= snapc
);
802 end_page_writeback(page
);
803 ceph_put_wrbuffer_cap_refs(ci
, 1, snapc
);
804 ceph_put_snap_context(snapc
); /* page's reference */
806 if (atomic_long_dec_return(&fsc
->writeback_count
) <
807 CONGESTION_OFF_THRESH(fsc
->mount_options
->congestion_kb
))
808 fsc
->write_congested
= false;
813 static int ceph_writepage(struct page
*page
, struct writeback_control
*wbc
)
816 struct inode
*inode
= page
->mapping
->host
;
820 if (wbc
->sync_mode
== WB_SYNC_NONE
&&
821 ceph_inode_to_fs_client(inode
)->write_congested
) {
822 redirty_page_for_writepage(wbc
, page
);
823 return AOP_WRITEPAGE_ACTIVATE
;
826 folio_wait_private_2(page_folio(page
)); /* [DEPRECATED] */
828 err
= writepage_nounlock(page
, wbc
);
829 if (err
== -ERESTARTSYS
) {
830 /* direct memory reclaimer was killed by SIGKILL. return 0
831 * to prevent caller from setting mapping/page error */
840 * async writeback completion handler.
842 * If we get an error, set the mapping error bit, but not the individual
845 static void writepages_finish(struct ceph_osd_request
*req
)
847 struct inode
*inode
= req
->r_inode
;
848 struct ceph_inode_info
*ci
= ceph_inode(inode
);
849 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
850 struct ceph_osd_data
*osd_data
;
852 int num_pages
, total_pages
= 0;
854 int rc
= req
->r_result
;
855 struct ceph_snap_context
*snapc
= req
->r_snapc
;
856 struct address_space
*mapping
= inode
->i_mapping
;
857 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
858 unsigned int len
= 0;
861 doutc(cl
, "%llx.%llx rc %d\n", ceph_vinop(inode
), rc
);
863 mapping_set_error(mapping
, rc
);
864 ceph_set_error_write(ci
);
865 if (rc
== -EBLOCKLISTED
)
866 fsc
->blocklisted
= true;
868 ceph_clear_error_write(ci
);
872 * We lost the cache cap, need to truncate the page before
873 * it is unlocked, otherwise we'd truncate it later in the
874 * page truncation thread, possibly losing some data that
877 remove_page
= !(ceph_caps_issued(ci
) &
878 (CEPH_CAP_FILE_CACHE
|CEPH_CAP_FILE_LAZYIO
));
880 /* clean all pages */
881 for (i
= 0; i
< req
->r_num_ops
; i
++) {
882 if (req
->r_ops
[i
].op
!= CEPH_OSD_OP_WRITE
) {
884 "%llx.%llx incorrect op %d req %p index %d tid %llu\n",
885 ceph_vinop(inode
), req
->r_ops
[i
].op
, req
, i
,
890 osd_data
= osd_req_op_extent_osd_data(req
, i
);
891 BUG_ON(osd_data
->type
!= CEPH_OSD_DATA_TYPE_PAGES
);
892 len
+= osd_data
->length
;
893 num_pages
= calc_pages_for((u64
)osd_data
->alignment
,
894 (u64
)osd_data
->length
);
895 total_pages
+= num_pages
;
896 for (j
= 0; j
< num_pages
; j
++) {
897 page
= osd_data
->pages
[j
];
898 if (fscrypt_is_bounce_page(page
)) {
899 page
= fscrypt_pagecache_page(page
);
900 fscrypt_free_bounce_page(osd_data
->pages
[j
]);
901 osd_data
->pages
[j
] = page
;
904 WARN_ON(!PageUptodate(page
));
906 if (atomic_long_dec_return(&fsc
->writeback_count
) <
907 CONGESTION_OFF_THRESH(
908 fsc
->mount_options
->congestion_kb
))
909 fsc
->write_congested
= false;
911 ceph_put_snap_context(detach_page_private(page
));
912 end_page_writeback(page
);
913 doutc(cl
, "unlocking %p\n", page
);
916 generic_error_remove_folio(inode
->i_mapping
,
921 doutc(cl
, "%llx.%llx wrote %llu bytes cleaned %d pages\n",
922 ceph_vinop(inode
), osd_data
->length
,
923 rc
>= 0 ? num_pages
: 0);
925 release_pages(osd_data
->pages
, num_pages
);
928 ceph_update_write_metrics(&fsc
->mdsc
->metric
, req
->r_start_latency
,
929 req
->r_end_latency
, len
, rc
);
931 ceph_put_wrbuffer_cap_refs(ci
, total_pages
, snapc
);
933 osd_data
= osd_req_op_extent_osd_data(req
, 0);
934 if (osd_data
->pages_from_pool
)
935 mempool_free(osd_data
->pages
, ceph_wb_pagevec_pool
);
937 kfree(osd_data
->pages
);
938 ceph_osdc_put_request(req
);
939 ceph_dec_osd_stopping_blocker(fsc
->mdsc
);
943 * initiate async writeback
945 static int ceph_writepages_start(struct address_space
*mapping
,
946 struct writeback_control
*wbc
)
948 struct inode
*inode
= mapping
->host
;
949 struct ceph_inode_info
*ci
= ceph_inode(inode
);
950 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
951 struct ceph_client
*cl
= fsc
->client
;
952 struct ceph_vino vino
= ceph_vino(inode
);
953 pgoff_t index
, start_index
, end
= -1;
954 struct ceph_snap_context
*snapc
= NULL
, *last_snapc
= NULL
, *pgsnapc
;
955 struct folio_batch fbatch
;
957 unsigned int wsize
= i_blocksize(inode
);
958 struct ceph_osd_request
*req
= NULL
;
959 struct ceph_writeback_ctl ceph_wbc
;
960 bool should_loop
, range_whole
= false;
962 bool caching
= ceph_is_cache_enabled(inode
);
965 if (wbc
->sync_mode
== WB_SYNC_NONE
&&
966 fsc
->write_congested
)
969 doutc(cl
, "%llx.%llx (mode=%s)\n", ceph_vinop(inode
),
970 wbc
->sync_mode
== WB_SYNC_NONE
? "NONE" :
971 (wbc
->sync_mode
== WB_SYNC_ALL
? "ALL" : "HOLD"));
973 if (ceph_inode_is_shutdown(inode
)) {
974 if (ci
->i_wrbuffer_ref
> 0) {
975 pr_warn_ratelimited_client(cl
,
976 "%llx.%llx %lld forced umount\n",
977 ceph_vinop(inode
), ceph_ino(inode
));
979 mapping_set_error(mapping
, -EIO
);
980 return -EIO
; /* we're in a forced umount, don't write! */
982 if (fsc
->mount_options
->wsize
< wsize
)
983 wsize
= fsc
->mount_options
->wsize
;
985 folio_batch_init(&fbatch
);
987 start_index
= wbc
->range_cyclic
? mapping
->writeback_index
: 0;
990 if (wbc
->sync_mode
== WB_SYNC_ALL
|| wbc
->tagged_writepages
) {
991 tag
= PAGECACHE_TAG_TOWRITE
;
993 tag
= PAGECACHE_TAG_DIRTY
;
996 /* find oldest snap context with dirty data */
997 snapc
= get_oldest_context(inode
, &ceph_wbc
, NULL
);
999 /* hmm, why does writepages get called when there
1000 is no dirty data? */
1001 doutc(cl
, " no snap context with dirty data?\n");
1004 doutc(cl
, " oldest snapc is %p seq %lld (%d snaps)\n", snapc
,
1005 snapc
->seq
, snapc
->num_snaps
);
1007 should_loop
= false;
1008 if (ceph_wbc
.head_snapc
&& snapc
!= last_snapc
) {
1009 /* where to start/end? */
1010 if (wbc
->range_cyclic
) {
1011 index
= start_index
;
1015 doutc(cl
, " cyclic, start at %lu\n", index
);
1017 index
= wbc
->range_start
>> PAGE_SHIFT
;
1018 end
= wbc
->range_end
>> PAGE_SHIFT
;
1019 if (wbc
->range_start
== 0 && wbc
->range_end
== LLONG_MAX
)
1021 doutc(cl
, " not cyclic, %lu to %lu\n", index
, end
);
1023 } else if (!ceph_wbc
.head_snapc
) {
1024 /* Do not respect wbc->range_{start,end}. Dirty pages
1025 * in that range can be associated with newer snapc.
1026 * They are not writeable until we write all dirty pages
1027 * associated with 'snapc' get written */
1030 doutc(cl
, " non-head snapc, range whole\n");
1033 if (wbc
->sync_mode
== WB_SYNC_ALL
|| wbc
->tagged_writepages
)
1034 tag_pages_for_writeback(mapping
, index
, end
);
1036 ceph_put_snap_context(last_snapc
);
1039 while (!done
&& index
<= end
) {
1040 int num_ops
= 0, op_idx
;
1041 unsigned i
, nr_folios
, max_pages
, locked_pages
= 0;
1042 struct page
**pages
= NULL
, **data_pages
;
1044 pgoff_t strip_unit_end
= 0;
1045 u64 offset
= 0, len
= 0;
1046 bool from_pool
= false;
1048 max_pages
= wsize
>> PAGE_SHIFT
;
1051 nr_folios
= filemap_get_folios_tag(mapping
, &index
,
1053 doutc(cl
, "pagevec_lookup_range_tag got %d\n", nr_folios
);
1054 if (!nr_folios
&& !locked_pages
)
1056 for (i
= 0; i
< nr_folios
&& locked_pages
< max_pages
; i
++) {
1057 struct folio
*folio
= fbatch
.folios
[i
];
1059 page
= &folio
->page
;
1060 doutc(cl
, "? %p idx %lu\n", page
, page
->index
);
1061 if (locked_pages
== 0)
1062 lock_page(page
); /* first page */
1063 else if (!trylock_page(page
))
1066 /* only dirty pages, or our accounting breaks */
1067 if (unlikely(!PageDirty(page
)) ||
1068 unlikely(page
->mapping
!= mapping
)) {
1069 doutc(cl
, "!dirty or !mapping %p\n", page
);
1073 /* only if matching snap context */
1074 pgsnapc
= page_snap_context(page
);
1075 if (pgsnapc
!= snapc
) {
1076 doutc(cl
, "page snapc %p %lld != oldest %p %lld\n",
1077 pgsnapc
, pgsnapc
->seq
, snapc
, snapc
->seq
);
1079 !ceph_wbc
.head_snapc
&&
1080 wbc
->sync_mode
!= WB_SYNC_NONE
)
1085 if (page_offset(page
) >= ceph_wbc
.i_size
) {
1086 doutc(cl
, "folio at %lu beyond eof %llu\n",
1087 folio
->index
, ceph_wbc
.i_size
);
1088 if ((ceph_wbc
.size_stable
||
1089 folio_pos(folio
) >= i_size_read(inode
)) &&
1090 folio_clear_dirty_for_io(folio
))
1091 folio_invalidate(folio
, 0,
1093 folio_unlock(folio
);
1096 if (strip_unit_end
&& (page
->index
> strip_unit_end
)) {
1097 doutc(cl
, "end of strip unit %p\n", page
);
1101 if (folio_test_writeback(folio
) ||
1102 folio_test_private_2(folio
) /* [DEPRECATED] */) {
1103 if (wbc
->sync_mode
== WB_SYNC_NONE
) {
1104 doutc(cl
, "%p under writeback\n", folio
);
1105 folio_unlock(folio
);
1108 doutc(cl
, "waiting on writeback %p\n", folio
);
1109 folio_wait_writeback(folio
);
1110 folio_wait_private_2(folio
); /* [DEPRECATED] */
1113 if (!clear_page_dirty_for_io(page
)) {
1114 doutc(cl
, "%p !clear_page_dirty_for_io\n", page
);
1120 * We have something to write. If this is
1121 * the first locked page this time through,
1122 * calculate max possinle write size and
1123 * allocate a page array
1125 if (locked_pages
== 0) {
1130 /* prepare async write request */
1131 offset
= (u64
)page_offset(page
);
1132 ceph_calc_file_object_mapping(&ci
->i_layout
,
1139 strip_unit_end
= page
->index
+
1140 ((len
- 1) >> PAGE_SHIFT
);
1143 max_pages
= calc_pages_for(0, (u64
)len
);
1144 pages
= kmalloc_array(max_pages
,
1149 pages
= mempool_alloc(ceph_wb_pagevec_pool
, GFP_NOFS
);
1154 } else if (page
->index
!=
1155 (offset
+ len
) >> PAGE_SHIFT
) {
1156 if (num_ops
>= (from_pool
? CEPH_OSD_SLAB_OPS
:
1157 CEPH_OSD_MAX_OPS
)) {
1158 redirty_page_for_writepage(wbc
, page
);
1164 offset
= (u64
)page_offset(page
);
1168 /* note position of first page in fbatch */
1169 doutc(cl
, "%llx.%llx will write page %p idx %lu\n",
1170 ceph_vinop(inode
), page
, page
->index
);
1172 if (atomic_long_inc_return(&fsc
->writeback_count
) >
1173 CONGESTION_ON_THRESH(
1174 fsc
->mount_options
->congestion_kb
))
1175 fsc
->write_congested
= true;
1177 if (IS_ENCRYPTED(inode
)) {
1178 pages
[locked_pages
] =
1179 fscrypt_encrypt_pagecache_blocks(page
,
1181 locked_pages
? GFP_NOWAIT
: GFP_NOFS
);
1182 if (IS_ERR(pages
[locked_pages
])) {
1183 if (PTR_ERR(pages
[locked_pages
]) == -EINVAL
)
1185 "inode->i_blkbits=%hhu\n",
1187 /* better not fail on first page! */
1188 BUG_ON(locked_pages
== 0);
1189 pages
[locked_pages
] = NULL
;
1190 redirty_page_for_writepage(wbc
, page
);
1196 pages
[locked_pages
++] = page
;
1199 fbatch
.folios
[i
] = NULL
;
1200 len
+= thp_size(page
);
1203 /* did we get anything? */
1205 goto release_folios
;
1208 /* shift unused page to beginning of fbatch */
1209 for (j
= 0; j
< nr_folios
; j
++) {
1210 if (!fbatch
.folios
[j
])
1213 fbatch
.folios
[n
] = fbatch
.folios
[j
];
1218 if (nr_folios
&& i
== nr_folios
&&
1219 locked_pages
< max_pages
) {
1220 doutc(cl
, "reached end fbatch, trying for more\n");
1221 folio_batch_release(&fbatch
);
1222 goto get_more_pages
;
1227 offset
= ceph_fscrypt_page_offset(pages
[0]);
1230 req
= ceph_osdc_new_request(&fsc
->client
->osdc
,
1231 &ci
->i_layout
, vino
,
1232 offset
, &len
, 0, num_ops
,
1233 CEPH_OSD_OP_WRITE
, CEPH_OSD_FLAG_WRITE
,
1234 snapc
, ceph_wbc
.truncate_seq
,
1235 ceph_wbc
.truncate_size
, false);
1237 req
= ceph_osdc_new_request(&fsc
->client
->osdc
,
1238 &ci
->i_layout
, vino
,
1243 CEPH_OSD_FLAG_WRITE
,
1244 snapc
, ceph_wbc
.truncate_seq
,
1245 ceph_wbc
.truncate_size
, true);
1246 BUG_ON(IS_ERR(req
));
1248 BUG_ON(len
< ceph_fscrypt_page_offset(pages
[locked_pages
- 1]) +
1249 thp_size(pages
[locked_pages
- 1]) - offset
);
1251 if (!ceph_inc_osd_stopping_blocker(fsc
->mdsc
)) {
1253 goto release_folios
;
1255 req
->r_callback
= writepages_finish
;
1256 req
->r_inode
= inode
;
1258 /* Format the osd request message and submit the write */
1262 for (i
= 0; i
< locked_pages
; i
++) {
1263 struct page
*page
= ceph_fscrypt_pagecache_page(pages
[i
]);
1265 u64 cur_offset
= page_offset(page
);
1267 * Discontinuity in page range? Ceph can handle that by just passing
1268 * multiple extents in the write op.
1270 if (offset
+ len
!= cur_offset
) {
1271 /* If it's full, stop here */
1272 if (op_idx
+ 1 == req
->r_num_ops
)
1275 /* Kick off an fscache write with what we have so far. */
1276 ceph_fscache_write_to_cache(inode
, offset
, len
, caching
);
1278 /* Start a new extent */
1279 osd_req_op_extent_dup_last(req
, op_idx
,
1280 cur_offset
- offset
);
1281 doutc(cl
, "got pages at %llu~%llu\n", offset
,
1283 osd_req_op_extent_osd_data_pages(req
, op_idx
,
1286 osd_req_op_extent_update(req
, op_idx
, len
);
1289 offset
= cur_offset
;
1290 data_pages
= pages
+ i
;
1294 set_page_writeback(page
);
1296 ceph_set_page_fscache(page
);
1297 len
+= thp_size(page
);
1299 ceph_fscache_write_to_cache(inode
, offset
, len
, caching
);
1301 if (ceph_wbc
.size_stable
) {
1302 len
= min(len
, ceph_wbc
.i_size
- offset
);
1303 } else if (i
== locked_pages
) {
1304 /* writepages_finish() clears writeback pages
1305 * according to the data length, so make sure
1306 * data length covers all locked pages */
1307 u64 min_len
= len
+ 1 - thp_size(page
);
1308 len
= get_writepages_data_length(inode
, pages
[i
- 1],
1310 len
= max(len
, min_len
);
1312 if (IS_ENCRYPTED(inode
))
1313 len
= round_up(len
, CEPH_FSCRYPT_BLOCK_SIZE
);
1315 doutc(cl
, "got pages at %llu~%llu\n", offset
, len
);
1317 if (IS_ENCRYPTED(inode
) &&
1318 ((offset
| len
) & ~CEPH_FSCRYPT_BLOCK_MASK
))
1320 "bad encrypted write offset=%lld len=%llu\n",
1323 osd_req_op_extent_osd_data_pages(req
, op_idx
, data_pages
, len
,
1324 0, from_pool
, false);
1325 osd_req_op_extent_update(req
, op_idx
, len
);
1327 BUG_ON(op_idx
+ 1 != req
->r_num_ops
);
1330 if (i
< locked_pages
) {
1331 BUG_ON(num_ops
<= req
->r_num_ops
);
1332 num_ops
-= req
->r_num_ops
;
1335 /* allocate new pages array for next request */
1337 pages
= kmalloc_array(locked_pages
, sizeof(*pages
),
1341 pages
= mempool_alloc(ceph_wb_pagevec_pool
, GFP_NOFS
);
1344 memcpy(pages
, data_pages
+ i
,
1345 locked_pages
* sizeof(*pages
));
1346 memset(data_pages
+ i
, 0,
1347 locked_pages
* sizeof(*pages
));
1349 BUG_ON(num_ops
!= req
->r_num_ops
);
1350 index
= pages
[i
- 1]->index
+ 1;
1351 /* request message now owns the pages array */
1355 req
->r_mtime
= inode_get_mtime(inode
);
1356 ceph_osdc_start_request(&fsc
->client
->osdc
, req
);
1359 wbc
->nr_to_write
-= i
;
1364 * We stop writing back only if we are not doing
1365 * integrity sync. In case of integrity sync we have to
1366 * keep going until we have written all the pages
1367 * we tagged for writeback prior to entering this loop.
1369 if (wbc
->nr_to_write
<= 0 && wbc
->sync_mode
== WB_SYNC_NONE
)
1373 doutc(cl
, "folio_batch release on %d folios (%p)\n",
1374 (int)fbatch
.nr
, fbatch
.nr
? fbatch
.folios
[0] : NULL
);
1375 folio_batch_release(&fbatch
);
1378 if (should_loop
&& !done
) {
1379 /* more to do; loop back to beginning of file */
1380 doutc(cl
, "looping back to beginning of file\n");
1381 end
= start_index
- 1; /* OK even when start_index == 0 */
1383 /* to write dirty pages associated with next snapc,
1384 * we need to wait until current writes complete */
1385 if (wbc
->sync_mode
!= WB_SYNC_NONE
&&
1386 start_index
== 0 && /* all dirty pages were checked */
1387 !ceph_wbc
.head_snapc
) {
1391 while ((index
<= end
) &&
1392 (nr
= filemap_get_folios_tag(mapping
, &index
,
1394 PAGECACHE_TAG_WRITEBACK
,
1396 for (i
= 0; i
< nr
; i
++) {
1397 page
= &fbatch
.folios
[i
]->page
;
1398 if (page_snap_context(page
) != snapc
)
1400 wait_on_page_writeback(page
);
1402 folio_batch_release(&fbatch
);
1412 if (wbc
->range_cyclic
|| (range_whole
&& wbc
->nr_to_write
> 0))
1413 mapping
->writeback_index
= index
;
1416 ceph_osdc_put_request(req
);
1417 ceph_put_snap_context(last_snapc
);
1418 doutc(cl
, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode
),
1426 * See if a given @snapc is either writeable, or already written.
1428 static int context_is_writeable_or_written(struct inode
*inode
,
1429 struct ceph_snap_context
*snapc
)
1431 struct ceph_snap_context
*oldest
= get_oldest_context(inode
, NULL
, NULL
);
1432 int ret
= !oldest
|| snapc
->seq
<= oldest
->seq
;
1434 ceph_put_snap_context(oldest
);
1439 * ceph_find_incompatible - find an incompatible context and return it
1440 * @page: page being dirtied
1442 * We are only allowed to write into/dirty a page if the page is
1443 * clean, or already dirty within the same snap context. Returns a
1444 * conflicting context if there is one, NULL if there isn't, or a
1445 * negative error code on other errors.
1447 * Must be called with page lock held.
1449 static struct ceph_snap_context
*
1450 ceph_find_incompatible(struct page
*page
)
1452 struct inode
*inode
= page
->mapping
->host
;
1453 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1454 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1456 if (ceph_inode_is_shutdown(inode
)) {
1457 doutc(cl
, " %llx.%llx page %p is shutdown\n",
1458 ceph_vinop(inode
), page
);
1459 return ERR_PTR(-ESTALE
);
1463 struct ceph_snap_context
*snapc
, *oldest
;
1465 wait_on_page_writeback(page
);
1467 snapc
= page_snap_context(page
);
1468 if (!snapc
|| snapc
== ci
->i_head_snapc
)
1472 * this page is already dirty in another (older) snap
1473 * context! is it writeable now?
1475 oldest
= get_oldest_context(inode
, NULL
, NULL
);
1476 if (snapc
->seq
> oldest
->seq
) {
1477 /* not writeable -- return it for the caller to deal with */
1478 ceph_put_snap_context(oldest
);
1479 doutc(cl
, " %llx.%llx page %p snapc %p not current or oldest\n",
1480 ceph_vinop(inode
), page
, snapc
);
1481 return ceph_get_snap_context(snapc
);
1483 ceph_put_snap_context(oldest
);
1485 /* yay, writeable, do it now (without dropping page lock) */
1486 doutc(cl
, " %llx.%llx page %p snapc %p not current, but oldest\n",
1487 ceph_vinop(inode
), page
, snapc
);
1488 if (clear_page_dirty_for_io(page
)) {
1489 int r
= writepage_nounlock(page
, NULL
);
1497 static int ceph_netfs_check_write_begin(struct file
*file
, loff_t pos
, unsigned int len
,
1498 struct folio
**foliop
, void **_fsdata
)
1500 struct inode
*inode
= file_inode(file
);
1501 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1502 struct ceph_snap_context
*snapc
;
1504 snapc
= ceph_find_incompatible(folio_page(*foliop
, 0));
1508 folio_unlock(*foliop
);
1512 return PTR_ERR(snapc
);
1514 ceph_queue_writeback(inode
);
1515 r
= wait_event_killable(ci
->i_cap_wq
,
1516 context_is_writeable_or_written(inode
, snapc
));
1517 ceph_put_snap_context(snapc
);
1518 return r
== 0 ? -EAGAIN
: r
;
1524 * We are only allowed to write into/dirty the page if the page is
1525 * clean, or already dirty within the same snap context.
1527 static int ceph_write_begin(struct file
*file
, struct address_space
*mapping
,
1528 loff_t pos
, unsigned len
,
1529 struct folio
**foliop
, void **fsdata
)
1531 struct inode
*inode
= file_inode(file
);
1532 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1535 r
= netfs_write_begin(&ci
->netfs
, file
, inode
->i_mapping
, pos
, len
, foliop
, NULL
);
1539 folio_wait_private_2(*foliop
); /* [DEPRECATED] */
1540 WARN_ON_ONCE(!folio_test_locked(*foliop
));
1545 * we don't do anything in here that simple_write_end doesn't do
1546 * except adjust dirty page accounting
1548 static int ceph_write_end(struct file
*file
, struct address_space
*mapping
,
1549 loff_t pos
, unsigned len
, unsigned copied
,
1550 struct folio
*folio
, void *fsdata
)
1552 struct inode
*inode
= file_inode(file
);
1553 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1554 bool check_cap
= false;
1556 doutc(cl
, "%llx.%llx file %p folio %p %d~%d (%d)\n", ceph_vinop(inode
),
1557 file
, folio
, (int)pos
, (int)copied
, (int)len
);
1559 if (!folio_test_uptodate(folio
)) {
1560 /* just return that nothing was copied on a short copy */
1565 folio_mark_uptodate(folio
);
1568 /* did file size increase? */
1569 if (pos
+copied
> i_size_read(inode
))
1570 check_cap
= ceph_inode_set_size(inode
, pos
+copied
);
1572 folio_mark_dirty(folio
);
1575 folio_unlock(folio
);
1579 ceph_check_caps(ceph_inode(inode
), CHECK_CAPS_AUTHONLY
);
1584 const struct address_space_operations ceph_aops
= {
1585 .read_folio
= netfs_read_folio
,
1586 .readahead
= netfs_readahead
,
1587 .writepage
= ceph_writepage
,
1588 .writepages
= ceph_writepages_start
,
1589 .write_begin
= ceph_write_begin
,
1590 .write_end
= ceph_write_end
,
1591 .dirty_folio
= ceph_dirty_folio
,
1592 .invalidate_folio
= ceph_invalidate_folio
,
1593 .release_folio
= netfs_release_folio
,
1594 .direct_IO
= noop_direct_IO
,
1597 static void ceph_block_sigs(sigset_t
*oldset
)
1600 siginitsetinv(&mask
, sigmask(SIGKILL
));
1601 sigprocmask(SIG_BLOCK
, &mask
, oldset
);
1604 static void ceph_restore_sigs(sigset_t
*oldset
)
1606 sigprocmask(SIG_SETMASK
, oldset
, NULL
);
1612 static vm_fault_t
ceph_filemap_fault(struct vm_fault
*vmf
)
1614 struct vm_area_struct
*vma
= vmf
->vma
;
1615 struct inode
*inode
= file_inode(vma
->vm_file
);
1616 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1617 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1618 struct ceph_file_info
*fi
= vma
->vm_file
->private_data
;
1619 loff_t off
= (loff_t
)vmf
->pgoff
<< PAGE_SHIFT
;
1622 vm_fault_t ret
= VM_FAULT_SIGBUS
;
1624 if (ceph_inode_is_shutdown(inode
))
1627 ceph_block_sigs(&oldset
);
1629 doutc(cl
, "%llx.%llx %llu trying to get caps\n",
1630 ceph_vinop(inode
), off
);
1631 if (fi
->fmode
& CEPH_FILE_MODE_LAZY
)
1632 want
= CEPH_CAP_FILE_CACHE
| CEPH_CAP_FILE_LAZYIO
;
1634 want
= CEPH_CAP_FILE_CACHE
;
1637 err
= ceph_get_caps(vma
->vm_file
, CEPH_CAP_FILE_RD
, want
, -1, &got
);
1641 doutc(cl
, "%llx.%llx %llu got cap refs on %s\n", ceph_vinop(inode
),
1642 off
, ceph_cap_string(got
));
1644 if ((got
& (CEPH_CAP_FILE_CACHE
| CEPH_CAP_FILE_LAZYIO
)) ||
1645 !ceph_has_inline_data(ci
)) {
1646 CEPH_DEFINE_RW_CONTEXT(rw_ctx
, got
);
1647 ceph_add_rw_context(fi
, &rw_ctx
);
1648 ret
= filemap_fault(vmf
);
1649 ceph_del_rw_context(fi
, &rw_ctx
);
1650 doutc(cl
, "%llx.%llx %llu drop cap refs %s ret %x\n",
1651 ceph_vinop(inode
), off
, ceph_cap_string(got
), ret
);
1655 ceph_put_cap_refs(ci
, got
);
1660 /* read inline data */
1661 if (off
>= PAGE_SIZE
) {
1662 /* does not support inline data > PAGE_SIZE */
1663 ret
= VM_FAULT_SIGBUS
;
1665 struct address_space
*mapping
= inode
->i_mapping
;
1668 filemap_invalidate_lock_shared(mapping
);
1669 page
= find_or_create_page(mapping
, 0,
1670 mapping_gfp_constraint(mapping
, ~__GFP_FS
));
1675 err
= __ceph_do_getattr(inode
, page
,
1676 CEPH_STAT_CAP_INLINE_DATA
, true);
1677 if (err
< 0 || off
>= i_size_read(inode
)) {
1680 ret
= vmf_error(err
);
1683 if (err
< PAGE_SIZE
)
1684 zero_user_segment(page
, err
, PAGE_SIZE
);
1686 flush_dcache_page(page
);
1687 SetPageUptodate(page
);
1689 ret
= VM_FAULT_MAJOR
| VM_FAULT_LOCKED
;
1691 filemap_invalidate_unlock_shared(mapping
);
1692 doutc(cl
, "%llx.%llx %llu read inline data ret %x\n",
1693 ceph_vinop(inode
), off
, ret
);
1696 ceph_restore_sigs(&oldset
);
1698 ret
= vmf_error(err
);
1703 static vm_fault_t
ceph_page_mkwrite(struct vm_fault
*vmf
)
1705 struct vm_area_struct
*vma
= vmf
->vma
;
1706 struct inode
*inode
= file_inode(vma
->vm_file
);
1707 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1708 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1709 struct ceph_file_info
*fi
= vma
->vm_file
->private_data
;
1710 struct ceph_cap_flush
*prealloc_cf
;
1711 struct page
*page
= vmf
->page
;
1712 loff_t off
= page_offset(page
);
1713 loff_t size
= i_size_read(inode
);
1717 vm_fault_t ret
= VM_FAULT_SIGBUS
;
1719 if (ceph_inode_is_shutdown(inode
))
1722 prealloc_cf
= ceph_alloc_cap_flush();
1724 return VM_FAULT_OOM
;
1726 sb_start_pagefault(inode
->i_sb
);
1727 ceph_block_sigs(&oldset
);
1729 if (off
+ thp_size(page
) <= size
)
1730 len
= thp_size(page
);
1732 len
= offset_in_thp(page
, size
);
1734 doutc(cl
, "%llx.%llx %llu~%zd getting caps i_size %llu\n",
1735 ceph_vinop(inode
), off
, len
, size
);
1736 if (fi
->fmode
& CEPH_FILE_MODE_LAZY
)
1737 want
= CEPH_CAP_FILE_BUFFER
| CEPH_CAP_FILE_LAZYIO
;
1739 want
= CEPH_CAP_FILE_BUFFER
;
1742 err
= ceph_get_caps(vma
->vm_file
, CEPH_CAP_FILE_WR
, want
, off
+ len
, &got
);
1746 doutc(cl
, "%llx.%llx %llu~%zd got cap refs on %s\n", ceph_vinop(inode
),
1747 off
, len
, ceph_cap_string(got
));
1749 /* Update time before taking page lock */
1750 file_update_time(vma
->vm_file
);
1751 inode_inc_iversion_raw(inode
);
1754 struct ceph_snap_context
*snapc
;
1758 if (page_mkwrite_check_truncate(page
, inode
) < 0) {
1760 ret
= VM_FAULT_NOPAGE
;
1764 snapc
= ceph_find_incompatible(page
);
1766 /* success. we'll keep the page locked. */
1767 set_page_dirty(page
);
1768 ret
= VM_FAULT_LOCKED
;
1774 if (IS_ERR(snapc
)) {
1775 ret
= VM_FAULT_SIGBUS
;
1779 ceph_queue_writeback(inode
);
1780 err
= wait_event_killable(ci
->i_cap_wq
,
1781 context_is_writeable_or_written(inode
, snapc
));
1782 ceph_put_snap_context(snapc
);
1785 if (ret
== VM_FAULT_LOCKED
) {
1787 spin_lock(&ci
->i_ceph_lock
);
1788 dirty
= __ceph_mark_dirty_caps(ci
, CEPH_CAP_FILE_WR
,
1790 spin_unlock(&ci
->i_ceph_lock
);
1792 __mark_inode_dirty(inode
, dirty
);
1795 doutc(cl
, "%llx.%llx %llu~%zd dropping cap refs on %s ret %x\n",
1796 ceph_vinop(inode
), off
, len
, ceph_cap_string(got
), ret
);
1797 ceph_put_cap_refs_async(ci
, got
);
1799 ceph_restore_sigs(&oldset
);
1800 sb_end_pagefault(inode
->i_sb
);
1801 ceph_free_cap_flush(prealloc_cf
);
1803 ret
= vmf_error(err
);
1807 void ceph_fill_inline_data(struct inode
*inode
, struct page
*locked_page
,
1808 char *data
, size_t len
)
1810 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
1811 struct address_space
*mapping
= inode
->i_mapping
;
1817 if (i_size_read(inode
) == 0)
1819 page
= find_or_create_page(mapping
, 0,
1820 mapping_gfp_constraint(mapping
,
1824 if (PageUptodate(page
)) {
1831 doutc(cl
, "%p %llx.%llx len %zu locked_page %p\n", inode
,
1832 ceph_vinop(inode
), len
, locked_page
);
1835 void *kaddr
= kmap_atomic(page
);
1836 memcpy(kaddr
, data
, len
);
1837 kunmap_atomic(kaddr
);
1840 if (page
!= locked_page
) {
1841 if (len
< PAGE_SIZE
)
1842 zero_user_segment(page
, len
, PAGE_SIZE
);
1844 flush_dcache_page(page
);
1846 SetPageUptodate(page
);
1852 int ceph_uninline_data(struct file
*file
)
1854 struct inode
*inode
= file_inode(file
);
1855 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1856 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(inode
);
1857 struct ceph_client
*cl
= fsc
->client
;
1858 struct ceph_osd_request
*req
= NULL
;
1859 struct ceph_cap_flush
*prealloc_cf
= NULL
;
1860 struct folio
*folio
= NULL
;
1861 u64 inline_version
= CEPH_INLINE_NONE
;
1862 struct page
*pages
[1];
1866 spin_lock(&ci
->i_ceph_lock
);
1867 inline_version
= ci
->i_inline_version
;
1868 spin_unlock(&ci
->i_ceph_lock
);
1870 doutc(cl
, "%llx.%llx inline_version %llu\n", ceph_vinop(inode
),
1873 if (ceph_inode_is_shutdown(inode
)) {
1878 if (inline_version
== CEPH_INLINE_NONE
)
1881 prealloc_cf
= ceph_alloc_cap_flush();
1885 if (inline_version
== 1) /* initial version, no data */
1888 folio
= read_mapping_folio(inode
->i_mapping
, 0, file
);
1889 if (IS_ERR(folio
)) {
1890 err
= PTR_ERR(folio
);
1896 len
= i_size_read(inode
);
1897 if (len
> folio_size(folio
))
1898 len
= folio_size(folio
);
1900 req
= ceph_osdc_new_request(&fsc
->client
->osdc
, &ci
->i_layout
,
1901 ceph_vino(inode
), 0, &len
, 0, 1,
1902 CEPH_OSD_OP_CREATE
, CEPH_OSD_FLAG_WRITE
,
1909 req
->r_mtime
= inode_get_mtime(inode
);
1910 ceph_osdc_start_request(&fsc
->client
->osdc
, req
);
1911 err
= ceph_osdc_wait_request(&fsc
->client
->osdc
, req
);
1912 ceph_osdc_put_request(req
);
1916 req
= ceph_osdc_new_request(&fsc
->client
->osdc
, &ci
->i_layout
,
1917 ceph_vino(inode
), 0, &len
, 1, 3,
1918 CEPH_OSD_OP_WRITE
, CEPH_OSD_FLAG_WRITE
,
1919 NULL
, ci
->i_truncate_seq
,
1920 ci
->i_truncate_size
, false);
1926 pages
[0] = folio_page(folio
, 0);
1927 osd_req_op_extent_osd_data_pages(req
, 1, pages
, len
, 0, false, false);
1930 __le64 xattr_buf
= cpu_to_le64(inline_version
);
1931 err
= osd_req_op_xattr_init(req
, 0, CEPH_OSD_OP_CMPXATTR
,
1932 "inline_version", &xattr_buf
,
1934 CEPH_OSD_CMPXATTR_OP_GT
,
1935 CEPH_OSD_CMPXATTR_MODE_U64
);
1942 int xattr_len
= snprintf(xattr_buf
, sizeof(xattr_buf
),
1943 "%llu", inline_version
);
1944 err
= osd_req_op_xattr_init(req
, 2, CEPH_OSD_OP_SETXATTR
,
1946 xattr_buf
, xattr_len
, 0, 0);
1951 req
->r_mtime
= inode_get_mtime(inode
);
1952 ceph_osdc_start_request(&fsc
->client
->osdc
, req
);
1953 err
= ceph_osdc_wait_request(&fsc
->client
->osdc
, req
);
1955 ceph_update_write_metrics(&fsc
->mdsc
->metric
, req
->r_start_latency
,
1956 req
->r_end_latency
, len
, err
);
1962 /* Set to CAP_INLINE_NONE and dirty the caps */
1963 down_read(&fsc
->mdsc
->snap_rwsem
);
1964 spin_lock(&ci
->i_ceph_lock
);
1965 ci
->i_inline_version
= CEPH_INLINE_NONE
;
1966 dirty
= __ceph_mark_dirty_caps(ci
, CEPH_CAP_FILE_WR
, &prealloc_cf
);
1967 spin_unlock(&ci
->i_ceph_lock
);
1968 up_read(&fsc
->mdsc
->snap_rwsem
);
1970 __mark_inode_dirty(inode
, dirty
);
1973 ceph_osdc_put_request(req
);
1974 if (err
== -ECANCELED
)
1978 folio_unlock(folio
);
1982 ceph_free_cap_flush(prealloc_cf
);
1983 doutc(cl
, "%llx.%llx inline_version %llu = %d\n",
1984 ceph_vinop(inode
), inline_version
, err
);
1988 static const struct vm_operations_struct ceph_vmops
= {
1989 .fault
= ceph_filemap_fault
,
1990 .page_mkwrite
= ceph_page_mkwrite
,
1993 int ceph_mmap(struct file
*file
, struct vm_area_struct
*vma
)
1995 struct address_space
*mapping
= file
->f_mapping
;
1997 if (!mapping
->a_ops
->read_folio
)
1999 vma
->vm_ops
= &ceph_vmops
;
2008 static int __ceph_pool_perm_get(struct ceph_inode_info
*ci
,
2009 s64 pool
, struct ceph_string
*pool_ns
)
2011 struct ceph_fs_client
*fsc
= ceph_inode_to_fs_client(&ci
->netfs
.inode
);
2012 struct ceph_mds_client
*mdsc
= fsc
->mdsc
;
2013 struct ceph_client
*cl
= fsc
->client
;
2014 struct ceph_osd_request
*rd_req
= NULL
, *wr_req
= NULL
;
2015 struct rb_node
**p
, *parent
;
2016 struct ceph_pool_perm
*perm
;
2017 struct page
**pages
;
2019 int err
= 0, err2
= 0, have
= 0;
2021 down_read(&mdsc
->pool_perm_rwsem
);
2022 p
= &mdsc
->pool_perm_tree
.rb_node
;
2024 perm
= rb_entry(*p
, struct ceph_pool_perm
, node
);
2025 if (pool
< perm
->pool
)
2027 else if (pool
> perm
->pool
)
2028 p
= &(*p
)->rb_right
;
2030 int ret
= ceph_compare_string(pool_ns
,
2036 p
= &(*p
)->rb_right
;
2043 up_read(&mdsc
->pool_perm_rwsem
);
2048 doutc(cl
, "pool %lld ns %.*s no perm cached\n", pool
,
2049 (int)pool_ns
->len
, pool_ns
->str
);
2051 doutc(cl
, "pool %lld no perm cached\n", pool
);
2053 down_write(&mdsc
->pool_perm_rwsem
);
2054 p
= &mdsc
->pool_perm_tree
.rb_node
;
2058 perm
= rb_entry(parent
, struct ceph_pool_perm
, node
);
2059 if (pool
< perm
->pool
)
2061 else if (pool
> perm
->pool
)
2062 p
= &(*p
)->rb_right
;
2064 int ret
= ceph_compare_string(pool_ns
,
2070 p
= &(*p
)->rb_right
;
2078 up_write(&mdsc
->pool_perm_rwsem
);
2082 rd_req
= ceph_osdc_alloc_request(&fsc
->client
->osdc
, NULL
,
2083 1, false, GFP_NOFS
);
2089 rd_req
->r_flags
= CEPH_OSD_FLAG_READ
;
2090 osd_req_op_init(rd_req
, 0, CEPH_OSD_OP_STAT
, 0);
2091 rd_req
->r_base_oloc
.pool
= pool
;
2093 rd_req
->r_base_oloc
.pool_ns
= ceph_get_string(pool_ns
);
2094 ceph_oid_printf(&rd_req
->r_base_oid
, "%llx.00000000", ci
->i_vino
.ino
);
2096 err
= ceph_osdc_alloc_messages(rd_req
, GFP_NOFS
);
2100 wr_req
= ceph_osdc_alloc_request(&fsc
->client
->osdc
, NULL
,
2101 1, false, GFP_NOFS
);
2107 wr_req
->r_flags
= CEPH_OSD_FLAG_WRITE
;
2108 osd_req_op_init(wr_req
, 0, CEPH_OSD_OP_CREATE
, CEPH_OSD_OP_FLAG_EXCL
);
2109 ceph_oloc_copy(&wr_req
->r_base_oloc
, &rd_req
->r_base_oloc
);
2110 ceph_oid_copy(&wr_req
->r_base_oid
, &rd_req
->r_base_oid
);
2112 err
= ceph_osdc_alloc_messages(wr_req
, GFP_NOFS
);
2116 /* one page should be large enough for STAT data */
2117 pages
= ceph_alloc_page_vector(1, GFP_KERNEL
);
2118 if (IS_ERR(pages
)) {
2119 err
= PTR_ERR(pages
);
2123 osd_req_op_raw_data_in_pages(rd_req
, 0, pages
, PAGE_SIZE
,
2125 ceph_osdc_start_request(&fsc
->client
->osdc
, rd_req
);
2127 wr_req
->r_mtime
= inode_get_mtime(&ci
->netfs
.inode
);
2128 ceph_osdc_start_request(&fsc
->client
->osdc
, wr_req
);
2130 err
= ceph_osdc_wait_request(&fsc
->client
->osdc
, rd_req
);
2131 err2
= ceph_osdc_wait_request(&fsc
->client
->osdc
, wr_req
);
2133 if (err
>= 0 || err
== -ENOENT
)
2135 else if (err
!= -EPERM
) {
2136 if (err
== -EBLOCKLISTED
)
2137 fsc
->blocklisted
= true;
2141 if (err2
== 0 || err2
== -EEXIST
)
2143 else if (err2
!= -EPERM
) {
2144 if (err2
== -EBLOCKLISTED
)
2145 fsc
->blocklisted
= true;
2150 pool_ns_len
= pool_ns
? pool_ns
->len
: 0;
2151 perm
= kmalloc(struct_size(perm
, pool_ns
, pool_ns_len
+ 1), GFP_NOFS
);
2159 perm
->pool_ns_len
= pool_ns_len
;
2160 if (pool_ns_len
> 0)
2161 memcpy(perm
->pool_ns
, pool_ns
->str
, pool_ns_len
);
2162 perm
->pool_ns
[pool_ns_len
] = 0;
2164 rb_link_node(&perm
->node
, parent
, p
);
2165 rb_insert_color(&perm
->node
, &mdsc
->pool_perm_tree
);
2168 up_write(&mdsc
->pool_perm_rwsem
);
2170 ceph_osdc_put_request(rd_req
);
2171 ceph_osdc_put_request(wr_req
);
2176 doutc(cl
, "pool %lld ns %.*s result = %d\n", pool
,
2177 (int)pool_ns
->len
, pool_ns
->str
, err
);
2179 doutc(cl
, "pool %lld result = %d\n", pool
, err
);
2183 int ceph_pool_perm_check(struct inode
*inode
, int need
)
2185 struct ceph_client
*cl
= ceph_inode_to_client(inode
);
2186 struct ceph_inode_info
*ci
= ceph_inode(inode
);
2187 struct ceph_string
*pool_ns
;
2191 /* Only need to do this for regular files */
2192 if (!S_ISREG(inode
->i_mode
))
2195 if (ci
->i_vino
.snap
!= CEPH_NOSNAP
) {
2197 * Pool permission check needs to write to the first object.
2198 * But for snapshot, head of the first object may have already
2199 * been deleted. Skip check to avoid creating orphan object.
2204 if (ceph_test_mount_opt(ceph_inode_to_fs_client(inode
),
2208 spin_lock(&ci
->i_ceph_lock
);
2209 flags
= ci
->i_ceph_flags
;
2210 pool
= ci
->i_layout
.pool_id
;
2211 spin_unlock(&ci
->i_ceph_lock
);
2213 if (flags
& CEPH_I_POOL_PERM
) {
2214 if ((need
& CEPH_CAP_FILE_RD
) && !(flags
& CEPH_I_POOL_RD
)) {
2215 doutc(cl
, "pool %lld no read perm\n", pool
);
2218 if ((need
& CEPH_CAP_FILE_WR
) && !(flags
& CEPH_I_POOL_WR
)) {
2219 doutc(cl
, "pool %lld no write perm\n", pool
);
2225 pool_ns
= ceph_try_get_string(ci
->i_layout
.pool_ns
);
2226 ret
= __ceph_pool_perm_get(ci
, pool
, pool_ns
);
2227 ceph_put_string(pool_ns
);
2231 flags
= CEPH_I_POOL_PERM
;
2232 if (ret
& POOL_READ
)
2233 flags
|= CEPH_I_POOL_RD
;
2234 if (ret
& POOL_WRITE
)
2235 flags
|= CEPH_I_POOL_WR
;
2237 spin_lock(&ci
->i_ceph_lock
);
2238 if (pool
== ci
->i_layout
.pool_id
&&
2239 pool_ns
== rcu_dereference_raw(ci
->i_layout
.pool_ns
)) {
2240 ci
->i_ceph_flags
|= flags
;
2242 pool
= ci
->i_layout
.pool_id
;
2243 flags
= ci
->i_ceph_flags
;
2245 spin_unlock(&ci
->i_ceph_lock
);
2249 void ceph_pool_perm_destroy(struct ceph_mds_client
*mdsc
)
2251 struct ceph_pool_perm
*perm
;
2254 while (!RB_EMPTY_ROOT(&mdsc
->pool_perm_tree
)) {
2255 n
= rb_first(&mdsc
->pool_perm_tree
);
2256 perm
= rb_entry(n
, struct ceph_pool_perm
, node
);
2257 rb_erase(n
, &mdsc
->pool_perm_tree
);