2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #include <linux/types.h>
47 #include <linux/slab.h>
48 #include <linux/capability.h>
49 #include <linux/sched.h>
50 #include <linux/errno.h>
51 #include <linux/socket.h>
53 #include <linux/net.h>
55 #include <linux/udp.h>
56 #include <linux/tcp.h>
57 #include <linux/sunrpc/clnt.h>
58 #include <linux/file.h>
59 #include <linux/workqueue.h>
60 #include <linux/random.h>
63 #include <net/checksum.h>
72 # undef RPC_DEBUG_DATA
73 # define RPCDBG_FACILITY RPCDBG_XPRT
76 #define XPRT_MAX_BACKOFF (8)
77 #define XPRT_IDLE_TIMEOUT (5*60*HZ)
78 #define XPRT_MAX_RESVPORT (800)
83 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
84 static inline void do_xprt_reserve(struct rpc_task
*);
85 static void xprt_disconnect(struct rpc_xprt
*);
86 static void xprt_connect_status(struct rpc_task
*task
);
87 static struct rpc_xprt
* xprt_setup(int proto
, struct sockaddr_in
*ap
,
88 struct rpc_timeout
*to
);
89 static struct socket
*xprt_create_socket(struct rpc_xprt
*, int, int);
90 static void xprt_bind_socket(struct rpc_xprt
*, struct socket
*);
91 static int __xprt_get_cong(struct rpc_xprt
*, struct rpc_task
*);
93 static int xprt_clear_backlog(struct rpc_xprt
*xprt
);
97 * Print the buffer contents (first 128 bytes only--just enough for
101 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
103 u8
*buf
= (u8
*) packet
;
106 dprintk("RPC: %s\n", msg
);
107 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
111 dprintk("0x%04x ", j
);
113 dprintk("%02x%02x%02x%02x ",
114 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
120 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
127 * Look up RPC transport given an INET socket
129 static inline struct rpc_xprt
*
130 xprt_from_sock(struct sock
*sk
)
132 return (struct rpc_xprt
*) sk
->sk_user_data
;
136 * Serialize write access to sockets, in order to prevent different
137 * requests from interfering with each other.
138 * Also prevents TCP socket connects from colliding with writes.
141 __xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
143 struct rpc_rqst
*req
= task
->tk_rqstp
;
145 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
)) {
146 if (task
== xprt
->snd_task
)
150 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
)) {
151 xprt
->snd_task
= task
;
153 req
->rq_bytes_sent
= 0;
158 smp_mb__before_clear_bit();
159 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
160 smp_mb__after_clear_bit();
162 dprintk("RPC: %4d failed to lock socket %p\n", task
->tk_pid
, xprt
);
163 task
->tk_timeout
= 0;
164 task
->tk_status
= -EAGAIN
;
165 if (req
&& req
->rq_ntrans
)
166 rpc_sleep_on(&xprt
->resend
, task
, NULL
, NULL
);
168 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
173 xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
177 spin_lock_bh(&xprt
->sock_lock
);
178 retval
= __xprt_lock_write(xprt
, task
);
179 spin_unlock_bh(&xprt
->sock_lock
);
185 __xprt_lock_write_next(struct rpc_xprt
*xprt
)
187 struct rpc_task
*task
;
189 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
))
191 if (!xprt
->nocong
&& RPCXPRT_CONGESTED(xprt
))
193 task
= rpc_wake_up_next(&xprt
->resend
);
195 task
= rpc_wake_up_next(&xprt
->sending
);
199 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
)) {
200 struct rpc_rqst
*req
= task
->tk_rqstp
;
201 xprt
->snd_task
= task
;
203 req
->rq_bytes_sent
= 0;
209 smp_mb__before_clear_bit();
210 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
211 smp_mb__after_clear_bit();
215 * Releases the socket for use by other requests.
218 __xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
220 if (xprt
->snd_task
== task
) {
221 xprt
->snd_task
= NULL
;
222 smp_mb__before_clear_bit();
223 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
224 smp_mb__after_clear_bit();
225 __xprt_lock_write_next(xprt
);
230 xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
232 spin_lock_bh(&xprt
->sock_lock
);
233 __xprt_release_write(xprt
, task
);
234 spin_unlock_bh(&xprt
->sock_lock
);
238 * Write data to socket.
241 xprt_sendmsg(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
243 struct socket
*sock
= xprt
->sock
;
244 struct xdr_buf
*xdr
= &req
->rq_snd_buf
;
245 struct sockaddr
*addr
= NULL
;
253 xprt_pktdump("packet data:",
254 req
->rq_svec
->iov_base
,
255 req
->rq_svec
->iov_len
);
257 /* For UDP, we need to provide an address */
259 addr
= (struct sockaddr
*) &xprt
->addr
;
260 addrlen
= sizeof(xprt
->addr
);
262 /* Dont repeat bytes */
263 skip
= req
->rq_bytes_sent
;
265 clear_bit(SOCK_ASYNC_NOSPACE
, &sock
->flags
);
266 result
= xdr_sendpages(sock
, addr
, addrlen
, xdr
, skip
, MSG_DONTWAIT
);
268 dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr
->len
- skip
, result
);
275 /* When the server has died, an ICMP port unreachable message
276 * prompts ECONNREFUSED.
283 /* connection broken */
288 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
294 * Van Jacobson congestion avoidance. Check if the congestion window
295 * overflowed. Put the task to sleep if this is the case.
298 __xprt_get_cong(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
300 struct rpc_rqst
*req
= task
->tk_rqstp
;
304 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
305 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
306 if (RPCXPRT_CONGESTED(xprt
))
309 xprt
->cong
+= RPC_CWNDSCALE
;
314 * Adjust the congestion window, and wake up the next task
315 * that has been sleeping due to congestion
318 __xprt_put_cong(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
323 xprt
->cong
-= RPC_CWNDSCALE
;
324 __xprt_lock_write_next(xprt
);
328 * Adjust RPC congestion window
329 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
332 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
337 if (result
>= 0 && cwnd
<= xprt
->cong
) {
338 /* The (cwnd >> 1) term makes sure
339 * the result gets rounded properly. */
340 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
341 if (cwnd
> RPC_MAXCWND(xprt
))
342 cwnd
= RPC_MAXCWND(xprt
);
343 __xprt_lock_write_next(xprt
);
344 } else if (result
== -ETIMEDOUT
) {
346 if (cwnd
< RPC_CWNDSCALE
)
347 cwnd
= RPC_CWNDSCALE
;
349 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
350 xprt
->cong
, xprt
->cwnd
, cwnd
);
355 * Reset the major timeout value
357 static void xprt_reset_majortimeo(struct rpc_rqst
*req
)
359 struct rpc_timeout
*to
= &req
->rq_xprt
->timeout
;
361 req
->rq_majortimeo
= req
->rq_timeout
;
362 if (to
->to_exponential
)
363 req
->rq_majortimeo
<<= to
->to_retries
;
365 req
->rq_majortimeo
+= to
->to_increment
* to
->to_retries
;
366 if (req
->rq_majortimeo
> to
->to_maxval
|| req
->rq_majortimeo
== 0)
367 req
->rq_majortimeo
= to
->to_maxval
;
368 req
->rq_majortimeo
+= jiffies
;
372 * Adjust timeout values etc for next retransmit
374 int xprt_adjust_timeout(struct rpc_rqst
*req
)
376 struct rpc_xprt
*xprt
= req
->rq_xprt
;
377 struct rpc_timeout
*to
= &xprt
->timeout
;
380 if (time_before(jiffies
, req
->rq_majortimeo
)) {
381 if (to
->to_exponential
)
382 req
->rq_timeout
<<= 1;
384 req
->rq_timeout
+= to
->to_increment
;
385 if (to
->to_maxval
&& req
->rq_timeout
>= to
->to_maxval
)
386 req
->rq_timeout
= to
->to_maxval
;
388 pprintk("RPC: %lu retrans\n", jiffies
);
390 req
->rq_timeout
= to
->to_initval
;
392 xprt_reset_majortimeo(req
);
393 /* Reset the RTT counters == "slow start" */
394 spin_lock_bh(&xprt
->sock_lock
);
395 rpc_init_rtt(req
->rq_task
->tk_client
->cl_rtt
, to
->to_initval
);
396 spin_unlock_bh(&xprt
->sock_lock
);
397 pprintk("RPC: %lu timeout\n", jiffies
);
401 if (req
->rq_timeout
== 0) {
402 printk(KERN_WARNING
"xprt_adjust_timeout: rq_timeout = 0!\n");
403 req
->rq_timeout
= 5 * HZ
;
409 * Close down a transport socket
412 xprt_close(struct rpc_xprt
*xprt
)
414 struct socket
*sock
= xprt
->sock
;
415 struct sock
*sk
= xprt
->inet
;
420 write_lock_bh(&sk
->sk_callback_lock
);
424 sk
->sk_user_data
= NULL
;
425 sk
->sk_data_ready
= xprt
->old_data_ready
;
426 sk
->sk_state_change
= xprt
->old_state_change
;
427 sk
->sk_write_space
= xprt
->old_write_space
;
428 write_unlock_bh(&sk
->sk_callback_lock
);
436 xprt_socket_autoclose(void *args
)
438 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)args
;
440 xprt_disconnect(xprt
);
442 xprt_release_write(xprt
, NULL
);
446 * Mark a transport as disconnected
449 xprt_disconnect(struct rpc_xprt
*xprt
)
451 dprintk("RPC: disconnected transport %p\n", xprt
);
452 spin_lock_bh(&xprt
->sock_lock
);
453 xprt_clear_connected(xprt
);
454 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
455 spin_unlock_bh(&xprt
->sock_lock
);
459 * Used to allow disconnection when we've been idle
462 xprt_init_autodisconnect(unsigned long data
)
464 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)data
;
466 spin_lock(&xprt
->sock_lock
);
467 if (!list_empty(&xprt
->recv
) || xprt
->shutdown
)
469 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
))
471 spin_unlock(&xprt
->sock_lock
);
472 /* Let keventd close the socket */
473 if (test_bit(XPRT_CONNECTING
, &xprt
->sockstate
) != 0)
474 xprt_release_write(xprt
, NULL
);
476 schedule_work(&xprt
->task_cleanup
);
479 spin_unlock(&xprt
->sock_lock
);
482 static void xprt_socket_connect(void *args
)
484 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)args
;
485 struct socket
*sock
= xprt
->sock
;
488 if (xprt
->shutdown
|| xprt
->addr
.sin_port
== 0)
492 * Start by resetting any existing state
495 sock
= xprt_create_socket(xprt
, xprt
->prot
, xprt
->resvport
);
497 /* couldn't create socket or bind to reserved port;
498 * this is likely a permanent error, so cause an abort */
501 xprt_bind_socket(xprt
, sock
);
502 xprt_sock_setbufsize(xprt
);
509 * Tell the socket layer to start connecting...
511 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
512 sizeof(xprt
->addr
), O_NONBLOCK
);
513 dprintk("RPC: %p connect status %d connected %d sock state %d\n",
514 xprt
, -status
, xprt_connected(xprt
), sock
->sk
->sk_state
);
524 rpc_wake_up_status(&xprt
->pending
, status
);
526 rpc_wake_up(&xprt
->pending
);
528 smp_mb__before_clear_bit();
529 clear_bit(XPRT_CONNECTING
, &xprt
->sockstate
);
530 smp_mb__after_clear_bit();
534 * Attempt to connect a TCP socket.
537 void xprt_connect(struct rpc_task
*task
)
539 struct rpc_xprt
*xprt
= task
->tk_xprt
;
541 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task
->tk_pid
,
542 xprt
, (xprt_connected(xprt
) ? "is" : "is not"));
544 if (xprt
->shutdown
) {
545 task
->tk_status
= -EIO
;
548 if (!xprt
->addr
.sin_port
) {
549 task
->tk_status
= -EIO
;
552 if (!xprt_lock_write(xprt
, task
))
554 if (xprt_connected(xprt
))
558 task
->tk_rqstp
->rq_bytes_sent
= 0;
560 task
->tk_timeout
= RPC_CONNECT_TIMEOUT
;
561 rpc_sleep_on(&xprt
->pending
, task
, xprt_connect_status
, NULL
);
562 if (!test_and_set_bit(XPRT_CONNECTING
, &xprt
->sockstate
)) {
563 /* Note: if we are here due to a dropped connection
564 * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
567 if (xprt
->sock
!= NULL
)
568 schedule_delayed_work(&xprt
->sock_connect
,
569 RPC_REESTABLISH_TIMEOUT
);
571 schedule_work(&xprt
->sock_connect
);
572 if (!RPC_IS_ASYNC(task
))
573 flush_scheduled_work();
578 xprt_release_write(xprt
, task
);
582 * We arrive here when awoken from waiting on connection establishment.
585 xprt_connect_status(struct rpc_task
*task
)
587 struct rpc_xprt
*xprt
= task
->tk_xprt
;
589 if (task
->tk_status
>= 0) {
590 dprintk("RPC: %4d xprt_connect_status: connection established\n",
595 switch (task
->tk_status
) {
598 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n",
599 task
->tk_pid
, task
->tk_client
->cl_server
);
602 dprintk("RPC: %4d xprt_connect_status: connection broken\n",
606 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n",
610 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n",
611 task
->tk_pid
, -task
->tk_status
, task
->tk_client
->cl_server
);
612 xprt_release_write(xprt
, task
);
613 task
->tk_status
= -EIO
;
617 /* if soft mounted, just cause this RPC to fail */
618 if (RPC_IS_SOFT(task
)) {
619 xprt_release_write(xprt
, task
);
620 task
->tk_status
= -EIO
;
625 * Look up the RPC request corresponding to a reply, and then lock it.
627 static inline struct rpc_rqst
*
628 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
630 struct list_head
*pos
;
631 struct rpc_rqst
*req
= NULL
;
633 list_for_each(pos
, &xprt
->recv
) {
634 struct rpc_rqst
*entry
= list_entry(pos
, struct rpc_rqst
, rq_list
);
635 if (entry
->rq_xid
== xid
) {
644 * Complete reply received.
645 * The TCP code relies on us to remove the request from xprt->pending.
648 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
650 struct rpc_task
*task
= req
->rq_task
;
651 struct rpc_clnt
*clnt
= task
->tk_client
;
653 /* Adjust congestion window */
655 unsigned timer
= task
->tk_msg
.rpc_proc
->p_timer
;
656 xprt_adjust_cwnd(xprt
, copied
);
657 __xprt_put_cong(xprt
, req
);
659 if (req
->rq_ntrans
== 1)
660 rpc_update_rtt(clnt
->cl_rtt
, timer
,
661 (long)jiffies
- req
->rq_xtime
);
662 rpc_set_timeo(clnt
->cl_rtt
, timer
, req
->rq_ntrans
- 1);
667 /* Profile only reads for now */
669 static unsigned long nextstat
;
670 static unsigned long pkt_rtt
, pkt_len
, pkt_cnt
;
673 pkt_len
+= req
->rq_slen
+ copied
;
674 pkt_rtt
+= jiffies
- req
->rq_xtime
;
675 if (time_before(nextstat
, jiffies
)) {
676 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
677 printk("RPC: %ld %ld %ld %ld stat\n",
678 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
679 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
680 nextstat
= jiffies
+ 5 * HZ
;
685 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
686 list_del_init(&req
->rq_list
);
687 req
->rq_received
= req
->rq_private_buf
.len
= copied
;
689 /* ... and wake up the process. */
690 rpc_wake_up_task(task
);
695 * Input handler for RPC replies. Called from a bottom half and hence
699 udp_data_ready(struct sock
*sk
, int len
)
701 struct rpc_task
*task
;
702 struct rpc_xprt
*xprt
;
703 struct rpc_rqst
*rovr
;
705 int err
, repsize
, copied
;
708 read_lock(&sk
->sk_callback_lock
);
709 dprintk("RPC: udp_data_ready...\n");
710 if (!(xprt
= xprt_from_sock(sk
))) {
711 printk("RPC: udp_data_ready request not found!\n");
715 dprintk("RPC: udp_data_ready client %p\n", xprt
);
717 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
723 repsize
= skb
->len
- sizeof(struct udphdr
);
725 printk("RPC: impossible RPC reply size %d!\n", repsize
);
729 /* Copy the XID from the skb... */
730 xp
= skb_header_pointer(skb
, sizeof(struct udphdr
),
731 sizeof(_xid
), &_xid
);
735 /* Look up and lock the request corresponding to the given XID */
736 spin_lock(&xprt
->sock_lock
);
737 rovr
= xprt_lookup_rqst(xprt
, *xp
);
740 task
= rovr
->rq_task
;
742 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
744 if ((copied
= rovr
->rq_private_buf
.buflen
) > repsize
)
747 /* Suck it into the iovec, verify checksum if not done by hw. */
748 if (csum_partial_copy_to_xdr(&rovr
->rq_private_buf
, skb
))
751 /* Something worked... */
752 dst_confirm(skb
->dst
);
754 xprt_complete_rqst(xprt
, rovr
, copied
);
757 spin_unlock(&xprt
->sock_lock
);
759 skb_free_datagram(sk
, skb
);
761 read_unlock(&sk
->sk_callback_lock
);
765 * Copy from an skb into memory and shrink the skb.
768 tcp_copy_data(skb_reader_t
*desc
, void *p
, size_t len
)
770 if (len
> desc
->count
)
772 if (skb_copy_bits(desc
->skb
, desc
->offset
, p
, len
)) {
773 dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n",
779 dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n",
785 * TCP read fragment marker
788 tcp_read_fraghdr(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
793 p
= ((char *) &xprt
->tcp_recm
) + xprt
->tcp_offset
;
794 len
= sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
795 used
= tcp_copy_data(desc
, p
, len
);
796 xprt
->tcp_offset
+= used
;
799 xprt
->tcp_reclen
= ntohl(xprt
->tcp_recm
);
800 if (xprt
->tcp_reclen
& 0x80000000)
801 xprt
->tcp_flags
|= XPRT_LAST_FRAG
;
803 xprt
->tcp_flags
&= ~XPRT_LAST_FRAG
;
804 xprt
->tcp_reclen
&= 0x7fffffff;
805 xprt
->tcp_flags
&= ~XPRT_COPY_RECM
;
806 xprt
->tcp_offset
= 0;
807 /* Sanity check of the record length */
808 if (xprt
->tcp_reclen
< 4) {
809 printk(KERN_ERR
"RPC: Invalid TCP record fragment length\n");
810 xprt_disconnect(xprt
);
812 dprintk("RPC: reading TCP record fragment of length %d\n",
817 tcp_check_recm(struct rpc_xprt
*xprt
)
819 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
820 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
, xprt
->tcp_flags
);
821 if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
822 xprt
->tcp_flags
|= XPRT_COPY_RECM
;
823 xprt
->tcp_offset
= 0;
824 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
) {
825 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
826 xprt
->tcp_flags
|= XPRT_COPY_XID
;
827 xprt
->tcp_copied
= 0;
836 tcp_read_xid(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
841 len
= sizeof(xprt
->tcp_xid
) - xprt
->tcp_offset
;
842 dprintk("RPC: reading XID (%Zu bytes)\n", len
);
843 p
= ((char *) &xprt
->tcp_xid
) + xprt
->tcp_offset
;
844 used
= tcp_copy_data(desc
, p
, len
);
845 xprt
->tcp_offset
+= used
;
848 xprt
->tcp_flags
&= ~XPRT_COPY_XID
;
849 xprt
->tcp_flags
|= XPRT_COPY_DATA
;
850 xprt
->tcp_copied
= 4;
851 dprintk("RPC: reading reply for XID %08x\n",
852 ntohl(xprt
->tcp_xid
));
853 tcp_check_recm(xprt
);
857 * TCP read and complete request
860 tcp_read_request(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
862 struct rpc_rqst
*req
;
863 struct xdr_buf
*rcvbuf
;
867 /* Find and lock the request corresponding to this xid */
868 spin_lock(&xprt
->sock_lock
);
869 req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
);
871 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
872 dprintk("RPC: XID %08x request not found!\n",
873 ntohl(xprt
->tcp_xid
));
874 spin_unlock(&xprt
->sock_lock
);
878 rcvbuf
= &req
->rq_private_buf
;
880 if (len
> xprt
->tcp_reclen
- xprt
->tcp_offset
) {
881 skb_reader_t my_desc
;
883 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
884 memcpy(&my_desc
, desc
, sizeof(my_desc
));
886 r
= xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
887 &my_desc
, tcp_copy_data
);
891 r
= xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
892 desc
, tcp_copy_data
);
895 xprt
->tcp_copied
+= r
;
896 xprt
->tcp_offset
+= r
;
899 /* Error when copying to the receive buffer,
900 * usually because we weren't able to allocate
901 * additional buffer pages. All we can do now
902 * is turn off XPRT_COPY_DATA, so the request
903 * will not receive any additional updates,
905 * Any remaining data from this record will
908 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
909 dprintk("RPC: XID %08x truncated request\n",
910 ntohl(xprt
->tcp_xid
));
911 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
912 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
);
916 dprintk("RPC: XID %08x read %Zd bytes\n",
917 ntohl(xprt
->tcp_xid
), r
);
918 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
919 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
);
921 if (xprt
->tcp_copied
== req
->rq_private_buf
.buflen
)
922 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
923 else if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
924 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
)
925 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
929 if (!(xprt
->tcp_flags
& XPRT_COPY_DATA
)) {
930 dprintk("RPC: %4d received reply complete\n",
931 req
->rq_task
->tk_pid
);
932 xprt_complete_rqst(xprt
, req
, xprt
->tcp_copied
);
934 spin_unlock(&xprt
->sock_lock
);
935 tcp_check_recm(xprt
);
939 * TCP discard extra bytes from a short read
942 tcp_read_discard(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
946 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
947 if (len
> desc
->count
)
951 xprt
->tcp_offset
+= len
;
952 dprintk("RPC: discarded %Zu bytes\n", len
);
953 tcp_check_recm(xprt
);
957 * TCP record receive routine
958 * We first have to grab the record marker, then the XID, then the data.
961 tcp_data_recv(read_descriptor_t
*rd_desc
, struct sk_buff
*skb
,
962 unsigned int offset
, size_t len
)
964 struct rpc_xprt
*xprt
= rd_desc
->arg
.data
;
965 skb_reader_t desc
= {
972 dprintk("RPC: tcp_data_recv\n");
974 /* Read in a new fragment marker if necessary */
975 /* Can we ever really expect to get completely empty fragments? */
976 if (xprt
->tcp_flags
& XPRT_COPY_RECM
) {
977 tcp_read_fraghdr(xprt
, &desc
);
980 /* Read in the xid if necessary */
981 if (xprt
->tcp_flags
& XPRT_COPY_XID
) {
982 tcp_read_xid(xprt
, &desc
);
985 /* Read in the request data */
986 if (xprt
->tcp_flags
& XPRT_COPY_DATA
) {
987 tcp_read_request(xprt
, &desc
);
990 /* Skip over any trailing bytes on short reads */
991 tcp_read_discard(xprt
, &desc
);
992 } while (desc
.count
);
993 dprintk("RPC: tcp_data_recv done\n");
994 return len
- desc
.count
;
997 static void tcp_data_ready(struct sock
*sk
, int bytes
)
999 struct rpc_xprt
*xprt
;
1000 read_descriptor_t rd_desc
;
1002 read_lock(&sk
->sk_callback_lock
);
1003 dprintk("RPC: tcp_data_ready...\n");
1004 if (!(xprt
= xprt_from_sock(sk
))) {
1005 printk("RPC: tcp_data_ready socket info not found!\n");
1011 /* We use rd_desc to pass struct xprt to tcp_data_recv */
1012 rd_desc
.arg
.data
= xprt
;
1013 rd_desc
.count
= 65536;
1014 tcp_read_sock(sk
, &rd_desc
, tcp_data_recv
);
1016 read_unlock(&sk
->sk_callback_lock
);
1020 tcp_state_change(struct sock
*sk
)
1022 struct rpc_xprt
*xprt
;
1024 read_lock(&sk
->sk_callback_lock
);
1025 if (!(xprt
= xprt_from_sock(sk
)))
1027 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
1028 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1029 sk
->sk_state
, xprt_connected(xprt
),
1030 sock_flag(sk
, SOCK_DEAD
),
1031 sock_flag(sk
, SOCK_ZAPPED
));
1033 switch (sk
->sk_state
) {
1034 case TCP_ESTABLISHED
:
1035 spin_lock_bh(&xprt
->sock_lock
);
1036 if (!xprt_test_and_set_connected(xprt
)) {
1037 /* Reset TCP record info */
1038 xprt
->tcp_offset
= 0;
1039 xprt
->tcp_reclen
= 0;
1040 xprt
->tcp_copied
= 0;
1041 xprt
->tcp_flags
= XPRT_COPY_RECM
| XPRT_COPY_XID
;
1042 rpc_wake_up(&xprt
->pending
);
1044 spin_unlock_bh(&xprt
->sock_lock
);
1050 xprt_disconnect(xprt
);
1054 read_unlock(&sk
->sk_callback_lock
);
1058 * Called when more output buffer space is available for this socket.
1059 * We try not to wake our writers until they can make "significant"
1060 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1061 * with a bunch of small requests.
1064 xprt_write_space(struct sock
*sk
)
1066 struct rpc_xprt
*xprt
;
1067 struct socket
*sock
;
1069 read_lock(&sk
->sk_callback_lock
);
1070 if (!(xprt
= xprt_from_sock(sk
)) || !(sock
= sk
->sk_socket
))
1075 /* Wait until we have enough socket memory */
1077 /* from net/core/stream.c:sk_stream_write_space */
1078 if (sk_stream_wspace(sk
) < sk_stream_min_wspace(sk
))
1081 /* from net/core/sock.c:sock_def_write_space */
1082 if (!sock_writeable(sk
))
1086 if (!test_and_clear_bit(SOCK_NOSPACE
, &sock
->flags
))
1089 spin_lock_bh(&xprt
->sock_lock
);
1091 rpc_wake_up_task(xprt
->snd_task
);
1092 spin_unlock_bh(&xprt
->sock_lock
);
1094 read_unlock(&sk
->sk_callback_lock
);
1098 * RPC receive timeout handler.
1101 xprt_timer(struct rpc_task
*task
)
1103 struct rpc_rqst
*req
= task
->tk_rqstp
;
1104 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1106 spin_lock(&xprt
->sock_lock
);
1107 if (req
->rq_received
)
1110 xprt_adjust_cwnd(req
->rq_xprt
, -ETIMEDOUT
);
1111 __xprt_put_cong(xprt
, req
);
1113 dprintk("RPC: %4d xprt_timer (%s request)\n",
1114 task
->tk_pid
, req
? "pending" : "backlogged");
1116 task
->tk_status
= -ETIMEDOUT
;
1118 task
->tk_timeout
= 0;
1119 rpc_wake_up_task(task
);
1120 spin_unlock(&xprt
->sock_lock
);
1124 * Place the actual RPC call.
1125 * We have to copy the iovec because sendmsg fiddles with its contents.
1128 xprt_prepare_transmit(struct rpc_task
*task
)
1130 struct rpc_rqst
*req
= task
->tk_rqstp
;
1131 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1134 dprintk("RPC: %4d xprt_prepare_transmit\n", task
->tk_pid
);
1139 spin_lock_bh(&xprt
->sock_lock
);
1140 if (req
->rq_received
&& !req
->rq_bytes_sent
) {
1141 err
= req
->rq_received
;
1144 if (!__xprt_lock_write(xprt
, task
)) {
1149 if (!xprt_connected(xprt
)) {
1154 spin_unlock_bh(&xprt
->sock_lock
);
1159 xprt_transmit(struct rpc_task
*task
)
1161 struct rpc_clnt
*clnt
= task
->tk_client
;
1162 struct rpc_rqst
*req
= task
->tk_rqstp
;
1163 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1164 int status
, retry
= 0;
1167 dprintk("RPC: %4d xprt_transmit(%u)\n", task
->tk_pid
, req
->rq_slen
);
1169 /* set up everything as needed. */
1170 /* Write the record marker */
1172 u32
*marker
= req
->rq_svec
[0].iov_base
;
1174 *marker
= htonl(0x80000000|(req
->rq_slen
-sizeof(*marker
)));
1178 if (!req
->rq_received
) {
1179 if (list_empty(&req
->rq_list
)) {
1180 spin_lock_bh(&xprt
->sock_lock
);
1181 /* Update the softirq receive buffer */
1182 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
,
1183 sizeof(req
->rq_private_buf
));
1184 /* Add request to the receive list */
1185 list_add_tail(&req
->rq_list
, &xprt
->recv
);
1186 spin_unlock_bh(&xprt
->sock_lock
);
1187 xprt_reset_majortimeo(req
);
1188 /* Turn off autodisconnect */
1189 del_singleshot_timer_sync(&xprt
->timer
);
1191 } else if (!req
->rq_bytes_sent
)
1194 /* Continue transmitting the packet/record. We must be careful
1195 * to cope with writespace callbacks arriving _after_ we have
1196 * called xprt_sendmsg().
1199 req
->rq_xtime
= jiffies
;
1200 status
= xprt_sendmsg(xprt
, req
);
1206 req
->rq_bytes_sent
+= status
;
1208 /* If we've sent the entire packet, immediately
1209 * reset the count of bytes sent. */
1210 if (req
->rq_bytes_sent
>= req
->rq_slen
) {
1211 req
->rq_bytes_sent
= 0;
1215 if (status
>= req
->rq_slen
)
1221 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1222 task
->tk_pid
, req
->rq_slen
- req
->rq_bytes_sent
,
1230 /* Note: at this point, task->tk_sleeping has not yet been set,
1231 * hence there is no danger of the waking up task being put on
1232 * schedq, and being picked up by a parallel run of rpciod().
1234 task
->tk_status
= status
;
1238 if (test_bit(SOCK_ASYNC_NOSPACE
, &xprt
->sock
->flags
)) {
1239 /* Protect against races with xprt_write_space */
1240 spin_lock_bh(&xprt
->sock_lock
);
1241 /* Don't race with disconnect */
1242 if (!xprt_connected(xprt
))
1243 task
->tk_status
= -ENOTCONN
;
1244 else if (test_bit(SOCK_NOSPACE
, &xprt
->sock
->flags
)) {
1245 task
->tk_timeout
= req
->rq_timeout
;
1246 rpc_sleep_on(&xprt
->pending
, task
, NULL
, NULL
);
1248 spin_unlock_bh(&xprt
->sock_lock
);
1251 /* Keep holding the socket if it is blocked */
1252 rpc_delay(task
, HZ
>>4);
1255 task
->tk_timeout
= RPC_REESTABLISH_TIMEOUT
;
1256 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1261 xprt_disconnect(xprt
);
1263 xprt_release_write(xprt
, task
);
1266 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1267 /* Set the task's receive timeout value */
1268 spin_lock_bh(&xprt
->sock_lock
);
1269 if (!xprt
->nocong
) {
1270 int timer
= task
->tk_msg
.rpc_proc
->p_timer
;
1271 task
->tk_timeout
= rpc_calc_rto(clnt
->cl_rtt
, timer
);
1272 task
->tk_timeout
<<= rpc_ntimeo(clnt
->cl_rtt
, timer
) + req
->rq_retries
;
1273 if (task
->tk_timeout
> xprt
->timeout
.to_maxval
|| task
->tk_timeout
== 0)
1274 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
1276 task
->tk_timeout
= req
->rq_timeout
;
1277 /* Don't race with disconnect */
1278 if (!xprt_connected(xprt
))
1279 task
->tk_status
= -ENOTCONN
;
1280 else if (!req
->rq_received
)
1281 rpc_sleep_on(&xprt
->pending
, task
, NULL
, xprt_timer
);
1282 __xprt_release_write(xprt
, task
);
1283 spin_unlock_bh(&xprt
->sock_lock
);
1287 * Reserve an RPC call slot.
1290 do_xprt_reserve(struct rpc_task
*task
)
1292 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1294 task
->tk_status
= 0;
1297 if (!list_empty(&xprt
->free
)) {
1298 struct rpc_rqst
*req
= list_entry(xprt
->free
.next
, struct rpc_rqst
, rq_list
);
1299 list_del_init(&req
->rq_list
);
1300 task
->tk_rqstp
= req
;
1301 xprt_request_init(task
, xprt
);
1304 dprintk("RPC: waiting for request slot\n");
1305 task
->tk_status
= -EAGAIN
;
1306 task
->tk_timeout
= 0;
1307 rpc_sleep_on(&xprt
->backlog
, task
, NULL
, NULL
);
1311 xprt_reserve(struct rpc_task
*task
)
1313 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1315 task
->tk_status
= -EIO
;
1316 if (!xprt
->shutdown
) {
1317 spin_lock(&xprt
->xprt_lock
);
1318 do_xprt_reserve(task
);
1319 spin_unlock(&xprt
->xprt_lock
);
1324 * Allocate a 'unique' XID
1326 static inline u32
xprt_alloc_xid(struct rpc_xprt
*xprt
)
1331 static inline void xprt_init_xid(struct rpc_xprt
*xprt
)
1333 get_random_bytes(&xprt
->xid
, sizeof(xprt
->xid
));
1337 * Initialize RPC request
1340 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1342 struct rpc_rqst
*req
= task
->tk_rqstp
;
1344 req
->rq_timeout
= xprt
->timeout
.to_initval
;
1345 req
->rq_task
= task
;
1346 req
->rq_xprt
= xprt
;
1347 req
->rq_xid
= xprt_alloc_xid(xprt
);
1348 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
,
1349 req
, ntohl(req
->rq_xid
));
1353 * Release an RPC call slot
1356 xprt_release(struct rpc_task
*task
)
1358 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1359 struct rpc_rqst
*req
;
1361 if (!(req
= task
->tk_rqstp
))
1363 spin_lock_bh(&xprt
->sock_lock
);
1364 __xprt_release_write(xprt
, task
);
1365 __xprt_put_cong(xprt
, req
);
1366 if (!list_empty(&req
->rq_list
))
1367 list_del(&req
->rq_list
);
1368 xprt
->last_used
= jiffies
;
1369 if (list_empty(&xprt
->recv
) && !xprt
->shutdown
)
1370 mod_timer(&xprt
->timer
, xprt
->last_used
+ XPRT_IDLE_TIMEOUT
);
1371 spin_unlock_bh(&xprt
->sock_lock
);
1372 task
->tk_rqstp
= NULL
;
1373 memset(req
, 0, sizeof(*req
)); /* mark unused */
1375 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1377 spin_lock(&xprt
->xprt_lock
);
1378 list_add(&req
->rq_list
, &xprt
->free
);
1379 xprt_clear_backlog(xprt
);
1380 spin_unlock(&xprt
->xprt_lock
);
1384 * Set default timeout parameters
1387 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1389 if (proto
== IPPROTO_UDP
)
1390 xprt_set_timeout(to
, 5, 5 * HZ
);
1392 xprt_set_timeout(to
, 2, 60 * HZ
);
1396 * Set constant timeout
1399 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1402 to
->to_increment
= incr
;
1403 to
->to_maxval
= to
->to_initval
+ (incr
* retr
);
1404 to
->to_retries
= retr
;
1405 to
->to_exponential
= 0;
1408 unsigned int xprt_udp_slot_table_entries
= RPC_DEF_SLOT_TABLE
;
1409 unsigned int xprt_tcp_slot_table_entries
= RPC_DEF_SLOT_TABLE
;
1412 * Initialize an RPC client
1414 static struct rpc_xprt
*
1415 xprt_setup(int proto
, struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1417 struct rpc_xprt
*xprt
;
1418 unsigned int entries
;
1419 size_t slot_table_size
;
1420 struct rpc_rqst
*req
;
1422 dprintk("RPC: setting up %s transport...\n",
1423 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1425 entries
= (proto
== IPPROTO_TCP
)?
1426 xprt_tcp_slot_table_entries
: xprt_udp_slot_table_entries
;
1428 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1429 return ERR_PTR(-ENOMEM
);
1430 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1431 xprt
->max_reqs
= entries
;
1432 slot_table_size
= entries
* sizeof(xprt
->slot
[0]);
1433 xprt
->slot
= kmalloc(slot_table_size
, GFP_KERNEL
);
1434 if (xprt
->slot
== NULL
) {
1436 return ERR_PTR(-ENOMEM
);
1438 memset(xprt
->slot
, 0, slot_table_size
);
1442 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1444 xprt
->cwnd
= RPC_MAXCWND(xprt
);
1446 xprt
->max_payload
= (1U << 31) - 1;
1448 xprt
->cwnd
= RPC_INITCWND
;
1449 xprt
->max_payload
= (1U << 16) - (MAX_HEADER
<< 3);
1451 spin_lock_init(&xprt
->sock_lock
);
1452 spin_lock_init(&xprt
->xprt_lock
);
1453 init_waitqueue_head(&xprt
->cong_wait
);
1455 INIT_LIST_HEAD(&xprt
->free
);
1456 INIT_LIST_HEAD(&xprt
->recv
);
1457 INIT_WORK(&xprt
->sock_connect
, xprt_socket_connect
, xprt
);
1458 INIT_WORK(&xprt
->task_cleanup
, xprt_socket_autoclose
, xprt
);
1459 init_timer(&xprt
->timer
);
1460 xprt
->timer
.function
= xprt_init_autodisconnect
;
1461 xprt
->timer
.data
= (unsigned long) xprt
;
1462 xprt
->last_used
= jiffies
;
1463 xprt
->port
= XPRT_MAX_RESVPORT
;
1465 /* Set timeout parameters */
1467 xprt
->timeout
= *to
;
1469 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1471 rpc_init_wait_queue(&xprt
->pending
, "xprt_pending");
1472 rpc_init_wait_queue(&xprt
->sending
, "xprt_sending");
1473 rpc_init_wait_queue(&xprt
->resend
, "xprt_resend");
1474 rpc_init_priority_wait_queue(&xprt
->backlog
, "xprt_backlog");
1476 /* initialize free list */
1477 for (req
= &xprt
->slot
[entries
-1]; req
>= &xprt
->slot
[0]; req
--)
1478 list_add(&req
->rq_list
, &xprt
->free
);
1480 xprt_init_xid(xprt
);
1482 /* Check whether we want to use a reserved port */
1483 xprt
->resvport
= capable(CAP_NET_BIND_SERVICE
) ? 1 : 0;
1485 dprintk("RPC: created transport %p with %u slots\n", xprt
,
1492 * Bind to a reserved port
1494 static inline int xprt_bindresvport(struct rpc_xprt
*xprt
, struct socket
*sock
)
1496 struct sockaddr_in myaddr
= {
1497 .sin_family
= AF_INET
,
1501 /* Were we already bound to a given port? Try to reuse it */
1504 myaddr
.sin_port
= htons(port
);
1505 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1512 port
= XPRT_MAX_RESVPORT
;
1513 } while (err
== -EADDRINUSE
&& port
!= xprt
->port
);
1515 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1520 xprt_bind_socket(struct rpc_xprt
*xprt
, struct socket
*sock
)
1522 struct sock
*sk
= sock
->sk
;
1527 write_lock_bh(&sk
->sk_callback_lock
);
1528 sk
->sk_user_data
= xprt
;
1529 xprt
->old_data_ready
= sk
->sk_data_ready
;
1530 xprt
->old_state_change
= sk
->sk_state_change
;
1531 xprt
->old_write_space
= sk
->sk_write_space
;
1532 if (xprt
->prot
== IPPROTO_UDP
) {
1533 sk
->sk_data_ready
= udp_data_ready
;
1534 sk
->sk_no_check
= UDP_CSUM_NORCV
;
1535 xprt_set_connected(xprt
);
1537 tcp_sk(sk
)->nonagle
= 1; /* disable Nagle's algorithm */
1538 sk
->sk_data_ready
= tcp_data_ready
;
1539 sk
->sk_state_change
= tcp_state_change
;
1540 xprt_clear_connected(xprt
);
1542 sk
->sk_write_space
= xprt_write_space
;
1544 /* Reset to new socket */
1547 write_unlock_bh(&sk
->sk_callback_lock
);
1553 * Set socket buffer length
1556 xprt_sock_setbufsize(struct rpc_xprt
*xprt
)
1558 struct sock
*sk
= xprt
->inet
;
1562 if (xprt
->rcvsize
) {
1563 sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
1564 sk
->sk_rcvbuf
= xprt
->rcvsize
* xprt
->max_reqs
* 2;
1566 if (xprt
->sndsize
) {
1567 sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
1568 sk
->sk_sndbuf
= xprt
->sndsize
* xprt
->max_reqs
* 2;
1569 sk
->sk_write_space(sk
);
1574 * Datastream sockets are created here, but xprt_connect will create
1575 * and connect stream sockets.
1577 static struct socket
* xprt_create_socket(struct rpc_xprt
*xprt
, int proto
, int resvport
)
1579 struct socket
*sock
;
1582 dprintk("RPC: xprt_create_socket(%s %d)\n",
1583 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1585 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1587 if ((err
= sock_create_kern(PF_INET
, type
, proto
, &sock
)) < 0) {
1588 printk("RPC: can't create socket (%d).\n", -err
);
1592 /* If the caller has the capability, bind to a reserved port */
1593 if (resvport
&& xprt_bindresvport(xprt
, sock
) < 0) {
1594 printk("RPC: can't bind to reserved port.\n");
1606 * Create an RPC client transport given the protocol and peer address.
1609 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1611 struct rpc_xprt
*xprt
;
1613 xprt
= xprt_setup(proto
, sap
, to
);
1615 dprintk("RPC: xprt_create_proto failed\n");
1617 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt
);
1622 * Prepare for transport shutdown.
1625 xprt_shutdown(struct rpc_xprt
*xprt
)
1628 rpc_wake_up(&xprt
->sending
);
1629 rpc_wake_up(&xprt
->resend
);
1630 rpc_wake_up(&xprt
->pending
);
1631 rpc_wake_up(&xprt
->backlog
);
1632 wake_up(&xprt
->cong_wait
);
1633 del_timer_sync(&xprt
->timer
);
1635 /* synchronously wait for connect worker to finish */
1636 cancel_delayed_work(&xprt
->sock_connect
);
1637 flush_scheduled_work();
1641 * Clear the xprt backlog queue
1644 xprt_clear_backlog(struct rpc_xprt
*xprt
) {
1645 rpc_wake_up_next(&xprt
->backlog
);
1646 wake_up(&xprt
->cong_wait
);
1651 * Destroy an RPC transport, killing off all requests.
1654 xprt_destroy(struct rpc_xprt
*xprt
)
1656 dprintk("RPC: destroying transport %p\n", xprt
);
1657 xprt_shutdown(xprt
);
1658 xprt_disconnect(xprt
);