Merge tag 'rproc-v6.14' of git://git.kernel.org/pub/scm/linux/kernel/git/remoteproc...
[linux.git] / fs / nfs / localio.c
blob4b8618cf114caf4c52794ae734ab9de769f12934
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * NFS client support for local clients to bypass network stack
5 * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com>
6 * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com>
7 * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com>
8 * Copyright (C) 2024 NeilBrown <neilb@suse.de>
9 */
11 #include <linux/module.h>
12 #include <linux/errno.h>
13 #include <linux/vfs.h>
14 #include <linux/file.h>
15 #include <linux/inet.h>
16 #include <linux/sunrpc/addr.h>
17 #include <linux/inetdevice.h>
18 #include <net/addrconf.h>
19 #include <linux/nfs_common.h>
20 #include <linux/nfslocalio.h>
21 #include <linux/bvec.h>
23 #include <linux/nfs.h>
24 #include <linux/nfs_fs.h>
25 #include <linux/nfs_xdr.h>
27 #include "internal.h"
28 #include "pnfs.h"
29 #include "nfstrace.h"
31 #define NFSDBG_FACILITY NFSDBG_VFS
33 struct nfs_local_kiocb {
34 struct kiocb kiocb;
35 struct bio_vec *bvec;
36 struct nfs_pgio_header *hdr;
37 struct work_struct work;
38 struct nfsd_file *localio;
41 struct nfs_local_fsync_ctx {
42 struct nfsd_file *localio;
43 struct nfs_commit_data *data;
44 struct work_struct work;
45 struct completion *done;
48 static bool localio_enabled __read_mostly = true;
49 module_param(localio_enabled, bool, 0644);
51 static inline bool nfs_client_is_local(const struct nfs_client *clp)
53 return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
56 bool nfs_server_is_local(const struct nfs_client *clp)
58 return nfs_client_is_local(clp) && localio_enabled;
60 EXPORT_SYMBOL_GPL(nfs_server_is_local);
63 * UUID_IS_LOCAL XDR functions
66 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
67 struct xdr_stream *xdr,
68 const void *data)
70 const u8 *uuid = data;
72 encode_opaque_fixed(xdr, uuid, UUID_SIZE);
75 static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
76 struct xdr_stream *xdr,
77 void *result)
79 /* void return */
80 return 0;
83 static const struct rpc_procinfo nfs_localio_procedures[] = {
84 [LOCALIOPROC_UUID_IS_LOCAL] = {
85 .p_proc = LOCALIOPROC_UUID_IS_LOCAL,
86 .p_encode = localio_xdr_enc_uuidargs,
87 .p_decode = localio_xdr_dec_uuidres,
88 .p_arglen = XDR_QUADLEN(UUID_SIZE),
89 .p_replen = 0,
90 .p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
91 .p_name = "UUID_IS_LOCAL",
95 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
96 static const struct rpc_version nfslocalio_version1 = {
97 .number = 1,
98 .nrprocs = ARRAY_SIZE(nfs_localio_procedures),
99 .procs = nfs_localio_procedures,
100 .counts = nfs_localio_counts,
103 static const struct rpc_version *nfslocalio_version[] = {
104 [1] = &nfslocalio_version1,
107 extern const struct rpc_program nfslocalio_program;
108 static struct rpc_stat nfslocalio_rpcstat = { &nfslocalio_program };
110 const struct rpc_program nfslocalio_program = {
111 .name = "nfslocalio",
112 .number = NFS_LOCALIO_PROGRAM,
113 .nrvers = ARRAY_SIZE(nfslocalio_version),
114 .version = nfslocalio_version,
115 .stats = &nfslocalio_rpcstat,
119 * nfs_local_enable - enable local i/o for an nfs_client
121 static void nfs_local_enable(struct nfs_client *clp)
123 spin_lock(&clp->cl_localio_lock);
124 set_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
125 trace_nfs_local_enable(clp);
126 spin_unlock(&clp->cl_localio_lock);
130 * nfs_local_disable - disable local i/o for an nfs_client
132 void nfs_local_disable(struct nfs_client *clp)
134 spin_lock(&clp->cl_localio_lock);
135 if (test_and_clear_bit(NFS_CS_LOCAL_IO, &clp->cl_flags)) {
136 trace_nfs_local_disable(clp);
137 nfs_uuid_invalidate_one_client(&clp->cl_uuid);
139 spin_unlock(&clp->cl_localio_lock);
143 * nfs_init_localioclient - Initialise an NFS localio client connection
145 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
147 struct rpc_clnt *rpcclient_localio;
149 rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
150 &nfslocalio_program, 1);
152 dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
153 __func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
154 (IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
156 return rpcclient_localio;
159 static bool nfs_server_uuid_is_local(struct nfs_client *clp)
161 u8 uuid[UUID_SIZE];
162 struct rpc_message msg = {
163 .rpc_argp = &uuid,
165 struct rpc_clnt *rpcclient_localio;
166 int status;
168 rpcclient_localio = nfs_init_localioclient(clp);
169 if (IS_ERR(rpcclient_localio))
170 return false;
172 export_uuid(uuid, &clp->cl_uuid.uuid);
174 msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
175 status = rpc_call_sync(rpcclient_localio, &msg, 0);
176 dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
177 __func__, status);
178 rpc_shutdown_client(rpcclient_localio);
180 /* Server is only local if it initialized required struct members */
181 if (status || !clp->cl_uuid.net || !clp->cl_uuid.dom)
182 return false;
184 return true;
188 * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
189 * - called after alloc_client and init_client (so cl_rpcclient exists)
190 * - this function is idempotent, it can be called for old or new clients
192 void nfs_local_probe(struct nfs_client *clp)
194 /* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
195 if (!localio_enabled ||
196 clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
197 nfs_local_disable(clp);
198 return;
201 if (nfs_client_is_local(clp)) {
202 /* If already enabled, disable and re-enable */
203 nfs_local_disable(clp);
206 if (!nfs_uuid_begin(&clp->cl_uuid))
207 return;
208 if (nfs_server_uuid_is_local(clp))
209 nfs_local_enable(clp);
210 nfs_uuid_end(&clp->cl_uuid);
212 EXPORT_SYMBOL_GPL(nfs_local_probe);
215 * nfs_local_open_fh - open a local filehandle in terms of nfsd_file
217 * Returns a pointer to a struct nfsd_file or NULL
219 struct nfsd_file *
220 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
221 struct nfs_fh *fh, const fmode_t mode)
223 struct nfsd_file *localio;
224 int status;
226 if (!nfs_server_is_local(clp))
227 return NULL;
228 if (mode & ~(FMODE_READ | FMODE_WRITE))
229 return NULL;
231 localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
232 cred, fh, mode);
233 if (IS_ERR(localio)) {
234 status = PTR_ERR(localio);
235 trace_nfs_local_open_fh(fh, mode, status);
236 switch (status) {
237 case -ENOMEM:
238 case -ENXIO:
239 case -ENOENT:
240 /* Revalidate localio, will disable if unsupported */
241 nfs_local_probe(clp);
243 return NULL;
245 return localio;
247 EXPORT_SYMBOL_GPL(nfs_local_open_fh);
249 static struct bio_vec *
250 nfs_bvec_alloc_and_import_pagevec(struct page **pagevec,
251 unsigned int npages, gfp_t flags)
253 struct bio_vec *bvec, *p;
255 bvec = kmalloc_array(npages, sizeof(*bvec), flags);
256 if (bvec != NULL) {
257 for (p = bvec; npages > 0; p++, pagevec++, npages--) {
258 p->bv_page = *pagevec;
259 p->bv_len = PAGE_SIZE;
260 p->bv_offset = 0;
263 return bvec;
266 static void
267 nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
269 kfree(iocb->bvec);
270 kfree(iocb);
273 static struct nfs_local_kiocb *
274 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
275 struct file *file, gfp_t flags)
277 struct nfs_local_kiocb *iocb;
279 iocb = kmalloc(sizeof(*iocb), flags);
280 if (iocb == NULL)
281 return NULL;
282 iocb->bvec = nfs_bvec_alloc_and_import_pagevec(hdr->page_array.pagevec,
283 hdr->page_array.npages, flags);
284 if (iocb->bvec == NULL) {
285 kfree(iocb);
286 return NULL;
288 init_sync_kiocb(&iocb->kiocb, file);
289 iocb->kiocb.ki_pos = hdr->args.offset;
290 iocb->hdr = hdr;
291 iocb->kiocb.ki_flags &= ~IOCB_APPEND;
292 return iocb;
295 static void
296 nfs_local_iter_init(struct iov_iter *i, struct nfs_local_kiocb *iocb, int dir)
298 struct nfs_pgio_header *hdr = iocb->hdr;
300 iov_iter_bvec(i, dir, iocb->bvec, hdr->page_array.npages,
301 hdr->args.count + hdr->args.pgbase);
302 if (hdr->args.pgbase != 0)
303 iov_iter_advance(i, hdr->args.pgbase);
306 static void
307 nfs_local_hdr_release(struct nfs_pgio_header *hdr,
308 const struct rpc_call_ops *call_ops)
310 call_ops->rpc_call_done(&hdr->task, hdr);
311 call_ops->rpc_release(hdr);
314 static void
315 nfs_local_pgio_init(struct nfs_pgio_header *hdr,
316 const struct rpc_call_ops *call_ops)
318 hdr->task.tk_ops = call_ops;
319 if (!hdr->task.tk_start)
320 hdr->task.tk_start = ktime_get();
323 static void
324 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
326 if (status >= 0) {
327 hdr->res.count = status;
328 hdr->res.op_status = NFS4_OK;
329 hdr->task.tk_status = 0;
330 } else {
331 hdr->res.op_status = nfs4_stat_to_errno(status);
332 hdr->task.tk_status = status;
336 static void
337 nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
339 struct nfs_pgio_header *hdr = iocb->hdr;
341 nfs_to_nfsd_file_put_local(iocb->localio);
342 nfs_local_iocb_free(iocb);
343 nfs_local_hdr_release(hdr, hdr->task.tk_ops);
346 static void
347 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
349 struct nfs_pgio_header *hdr = iocb->hdr;
350 struct file *filp = iocb->kiocb.ki_filp;
352 nfs_local_pgio_done(hdr, status);
355 * Must clear replen otherwise NFSv3 data corruption will occur
356 * if/when switching from LOCALIO back to using normal RPC.
358 hdr->res.replen = 0;
360 if (hdr->res.count != hdr->args.count ||
361 hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
362 hdr->res.eof = true;
364 dprintk("%s: read %ld bytes eof %d.\n", __func__,
365 status > 0 ? status : 0, hdr->res.eof);
368 static void nfs_local_call_read(struct work_struct *work)
370 struct nfs_local_kiocb *iocb =
371 container_of(work, struct nfs_local_kiocb, work);
372 struct file *filp = iocb->kiocb.ki_filp;
373 const struct cred *save_cred;
374 struct iov_iter iter;
375 ssize_t status;
377 save_cred = override_creds(filp->f_cred);
379 nfs_local_iter_init(&iter, iocb, READ);
381 status = filp->f_op->read_iter(&iocb->kiocb, &iter);
382 WARN_ON_ONCE(status == -EIOCBQUEUED);
384 nfs_local_read_done(iocb, status);
385 nfs_local_pgio_release(iocb);
387 revert_creds(save_cred);
390 static int
391 nfs_do_local_read(struct nfs_pgio_header *hdr,
392 struct nfsd_file *localio,
393 const struct rpc_call_ops *call_ops)
395 struct nfs_local_kiocb *iocb;
396 struct file *file = nfs_to->nfsd_file_file(localio);
398 /* Don't support filesystems without read_iter */
399 if (!file->f_op->read_iter)
400 return -EAGAIN;
402 dprintk("%s: vfs_read count=%u pos=%llu\n",
403 __func__, hdr->args.count, hdr->args.offset);
405 iocb = nfs_local_iocb_alloc(hdr, file, GFP_KERNEL);
406 if (iocb == NULL)
407 return -ENOMEM;
408 iocb->localio = localio;
410 nfs_local_pgio_init(hdr, call_ops);
411 hdr->res.eof = false;
413 INIT_WORK(&iocb->work, nfs_local_call_read);
414 queue_work(nfslocaliod_workqueue, &iocb->work);
416 return 0;
419 static void
420 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
422 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
423 u32 *verf = (u32 *)verifier->data;
424 int seq = 0;
426 do {
427 read_seqbegin_or_lock(&clp->cl_boot_lock, &seq);
428 verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
429 verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
430 } while (need_seqretry(&clp->cl_boot_lock, seq));
431 done_seqretry(&clp->cl_boot_lock, seq);
434 static void
435 nfs_reset_boot_verifier(struct inode *inode)
437 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
439 write_seqlock(&clp->cl_boot_lock);
440 ktime_get_real_ts64(&clp->cl_nfssvc_boot);
441 write_sequnlock(&clp->cl_boot_lock);
444 static void
445 nfs_set_local_verifier(struct inode *inode,
446 struct nfs_writeverf *verf,
447 enum nfs3_stable_how how)
449 nfs_copy_boot_verifier(&verf->verifier, inode);
450 verf->committed = how;
453 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */
454 static int __vfs_getattr(struct path *p, struct kstat *stat, int version)
456 u32 request_mask = STATX_BASIC_STATS;
458 if (version == 4)
459 request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
460 return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
463 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
464 static u64 __nfsd4_change_attribute(const struct kstat *stat,
465 const struct inode *inode)
467 u64 chattr;
469 if (stat->result_mask & STATX_CHANGE_COOKIE) {
470 chattr = stat->change_cookie;
471 if (S_ISREG(inode->i_mode) &&
472 !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
473 chattr += (u64)stat->ctime.tv_sec << 30;
474 chattr += stat->ctime.tv_nsec;
476 } else {
477 chattr = time_to_chattr(&stat->ctime);
479 return chattr;
482 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
484 struct kstat stat;
485 struct file *filp = iocb->kiocb.ki_filp;
486 struct nfs_pgio_header *hdr = iocb->hdr;
487 struct nfs_fattr *fattr = hdr->res.fattr;
488 int version = NFS_PROTO(hdr->inode)->version;
490 if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
491 return;
493 fattr->valid = (NFS_ATTR_FATTR_FILEID |
494 NFS_ATTR_FATTR_CHANGE |
495 NFS_ATTR_FATTR_SIZE |
496 NFS_ATTR_FATTR_ATIME |
497 NFS_ATTR_FATTR_MTIME |
498 NFS_ATTR_FATTR_CTIME |
499 NFS_ATTR_FATTR_SPACE_USED);
501 fattr->fileid = stat.ino;
502 fattr->size = stat.size;
503 fattr->atime = stat.atime;
504 fattr->mtime = stat.mtime;
505 fattr->ctime = stat.ctime;
506 if (version == 4) {
507 fattr->change_attr =
508 __nfsd4_change_attribute(&stat, file_inode(filp));
509 } else
510 fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
511 fattr->du.nfs3.used = stat.blocks << 9;
514 static void
515 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
517 struct nfs_pgio_header *hdr = iocb->hdr;
518 struct inode *inode = hdr->inode;
520 dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
522 /* Handle short writes as if they are ENOSPC */
523 if (status > 0 && status < hdr->args.count) {
524 hdr->mds_offset += status;
525 hdr->args.offset += status;
526 hdr->args.pgbase += status;
527 hdr->args.count -= status;
528 nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
529 status = -ENOSPC;
531 if (status < 0)
532 nfs_reset_boot_verifier(inode);
534 nfs_local_pgio_done(hdr, status);
537 static void nfs_local_call_write(struct work_struct *work)
539 struct nfs_local_kiocb *iocb =
540 container_of(work, struct nfs_local_kiocb, work);
541 struct file *filp = iocb->kiocb.ki_filp;
542 unsigned long old_flags = current->flags;
543 const struct cred *save_cred;
544 struct iov_iter iter;
545 ssize_t status;
547 current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
548 save_cred = override_creds(filp->f_cred);
550 nfs_local_iter_init(&iter, iocb, WRITE);
552 file_start_write(filp);
553 status = filp->f_op->write_iter(&iocb->kiocb, &iter);
554 file_end_write(filp);
555 WARN_ON_ONCE(status == -EIOCBQUEUED);
557 nfs_local_write_done(iocb, status);
558 nfs_local_vfs_getattr(iocb);
559 nfs_local_pgio_release(iocb);
561 revert_creds(save_cred);
562 current->flags = old_flags;
565 static int
566 nfs_do_local_write(struct nfs_pgio_header *hdr,
567 struct nfsd_file *localio,
568 const struct rpc_call_ops *call_ops)
570 struct nfs_local_kiocb *iocb;
571 struct file *file = nfs_to->nfsd_file_file(localio);
573 /* Don't support filesystems without write_iter */
574 if (!file->f_op->write_iter)
575 return -EAGAIN;
577 dprintk("%s: vfs_write count=%u pos=%llu %s\n",
578 __func__, hdr->args.count, hdr->args.offset,
579 (hdr->args.stable == NFS_UNSTABLE) ? "unstable" : "stable");
581 iocb = nfs_local_iocb_alloc(hdr, file, GFP_NOIO);
582 if (iocb == NULL)
583 return -ENOMEM;
584 iocb->localio = localio;
586 switch (hdr->args.stable) {
587 default:
588 break;
589 case NFS_DATA_SYNC:
590 iocb->kiocb.ki_flags |= IOCB_DSYNC;
591 break;
592 case NFS_FILE_SYNC:
593 iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
595 nfs_local_pgio_init(hdr, call_ops);
597 nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
599 INIT_WORK(&iocb->work, nfs_local_call_write);
600 queue_work(nfslocaliod_workqueue, &iocb->work);
602 return 0;
605 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
606 struct nfs_pgio_header *hdr,
607 const struct rpc_call_ops *call_ops)
609 int status = 0;
611 if (!hdr->args.count)
612 return 0;
614 switch (hdr->rw_mode) {
615 case FMODE_READ:
616 status = nfs_do_local_read(hdr, localio, call_ops);
617 break;
618 case FMODE_WRITE:
619 status = nfs_do_local_write(hdr, localio, call_ops);
620 break;
621 default:
622 dprintk("%s: invalid mode: %d\n", __func__,
623 hdr->rw_mode);
624 status = -EINVAL;
627 if (status != 0) {
628 if (status == -EAGAIN)
629 nfs_local_disable(clp);
630 nfs_to_nfsd_file_put_local(localio);
631 hdr->task.tk_status = status;
632 nfs_local_hdr_release(hdr, call_ops);
634 return status;
637 static void
638 nfs_local_init_commit(struct nfs_commit_data *data,
639 const struct rpc_call_ops *call_ops)
641 data->task.tk_ops = call_ops;
644 static int
645 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
647 loff_t start = data->args.offset;
648 loff_t end = LLONG_MAX;
650 if (data->args.count > 0) {
651 end = start + data->args.count - 1;
652 if (end < start)
653 end = LLONG_MAX;
656 dprintk("%s: commit %llu - %llu\n", __func__, start, end);
657 return vfs_fsync_range(filp, start, end, 0);
660 static void
661 nfs_local_commit_done(struct nfs_commit_data *data, int status)
663 if (status >= 0) {
664 nfs_set_local_verifier(data->inode,
665 data->res.verf,
666 NFS_FILE_SYNC);
667 data->res.op_status = NFS4_OK;
668 data->task.tk_status = 0;
669 } else {
670 nfs_reset_boot_verifier(data->inode);
671 data->res.op_status = nfs4_stat_to_errno(status);
672 data->task.tk_status = status;
676 static void
677 nfs_local_release_commit_data(struct nfsd_file *localio,
678 struct nfs_commit_data *data,
679 const struct rpc_call_ops *call_ops)
681 nfs_to_nfsd_file_put_local(localio);
682 call_ops->rpc_call_done(&data->task, data);
683 call_ops->rpc_release(data);
686 static void
687 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
689 nfs_local_release_commit_data(ctx->localio, ctx->data,
690 ctx->data->task.tk_ops);
691 kfree(ctx);
694 static void
695 nfs_local_fsync_work(struct work_struct *work)
697 struct nfs_local_fsync_ctx *ctx;
698 int status;
700 ctx = container_of(work, struct nfs_local_fsync_ctx, work);
702 status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
703 ctx->data);
704 nfs_local_commit_done(ctx->data, status);
705 if (ctx->done != NULL)
706 complete(ctx->done);
707 nfs_local_fsync_ctx_free(ctx);
710 static struct nfs_local_fsync_ctx *
711 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
712 struct nfsd_file *localio, gfp_t flags)
714 struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
716 if (ctx != NULL) {
717 ctx->localio = localio;
718 ctx->data = data;
719 INIT_WORK(&ctx->work, nfs_local_fsync_work);
720 ctx->done = NULL;
722 return ctx;
725 int nfs_local_commit(struct nfsd_file *localio,
726 struct nfs_commit_data *data,
727 const struct rpc_call_ops *call_ops, int how)
729 struct nfs_local_fsync_ctx *ctx;
731 ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
732 if (!ctx) {
733 nfs_local_commit_done(data, -ENOMEM);
734 nfs_local_release_commit_data(localio, data, call_ops);
735 return -ENOMEM;
738 nfs_local_init_commit(data, call_ops);
740 if (how & FLUSH_SYNC) {
741 DECLARE_COMPLETION_ONSTACK(done);
742 ctx->done = &done;
743 queue_work(nfsiod_workqueue, &ctx->work);
744 wait_for_completion(&done);
745 } else
746 queue_work(nfsiod_workqueue, &ctx->work);
748 return 0;