2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #include <linux/types.h>
47 #include <linux/slab.h>
48 #include <linux/capability.h>
49 #include <linux/sched.h>
50 #include <linux/errno.h>
51 #include <linux/socket.h>
53 #include <linux/net.h>
55 #include <linux/udp.h>
56 #include <linux/tcp.h>
57 #include <linux/sunrpc/clnt.h>
58 #include <linux/file.h>
59 #include <linux/workqueue.h>
60 #include <linux/random.h>
63 #include <net/checksum.h>
72 # undef RPC_DEBUG_DATA
73 # define RPCDBG_FACILITY RPCDBG_XPRT
76 #define XPRT_MAX_BACKOFF (8)
77 #define XPRT_IDLE_TIMEOUT (5*60*HZ)
78 #define XPRT_MAX_RESVPORT (800)
83 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
84 static inline void do_xprt_reserve(struct rpc_task
*);
85 static void xprt_disconnect(struct rpc_xprt
*);
86 static void xprt_connect_status(struct rpc_task
*task
);
87 static struct rpc_xprt
* xprt_setup(int proto
, struct sockaddr_in
*ap
,
88 struct rpc_timeout
*to
);
89 static struct socket
*xprt_create_socket(struct rpc_xprt
*, int, int);
90 static void xprt_bind_socket(struct rpc_xprt
*, struct socket
*);
91 static int __xprt_get_cong(struct rpc_xprt
*, struct rpc_task
*);
93 static int xprt_clear_backlog(struct rpc_xprt
*xprt
);
97 * Print the buffer contents (first 128 bytes only--just enough for
101 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
103 u8
*buf
= (u8
*) packet
;
106 dprintk("RPC: %s\n", msg
);
107 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
111 dprintk("0x%04x ", j
);
113 dprintk("%02x%02x%02x%02x ",
114 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
120 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
127 * Look up RPC transport given an INET socket
129 static inline struct rpc_xprt
*
130 xprt_from_sock(struct sock
*sk
)
132 return (struct rpc_xprt
*) sk
->sk_user_data
;
136 * Serialize write access to sockets, in order to prevent different
137 * requests from interfering with each other.
138 * Also prevents TCP socket connects from colliding with writes.
141 __xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
143 struct rpc_rqst
*req
= task
->tk_rqstp
;
145 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
)) {
146 if (task
== xprt
->snd_task
)
152 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
)) {
153 xprt
->snd_task
= task
;
155 req
->rq_bytes_sent
= 0;
160 smp_mb__before_clear_bit();
161 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
162 smp_mb__after_clear_bit();
164 dprintk("RPC: %4d failed to lock socket %p\n", task
->tk_pid
, xprt
);
165 task
->tk_timeout
= 0;
166 task
->tk_status
= -EAGAIN
;
167 if (req
&& req
->rq_ntrans
)
168 rpc_sleep_on(&xprt
->resend
, task
, NULL
, NULL
);
170 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
175 xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
179 spin_lock_bh(&xprt
->sock_lock
);
180 retval
= __xprt_lock_write(xprt
, task
);
181 spin_unlock_bh(&xprt
->sock_lock
);
187 __xprt_lock_write_next(struct rpc_xprt
*xprt
)
189 struct rpc_task
*task
;
191 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
))
193 if (!xprt
->nocong
&& RPCXPRT_CONGESTED(xprt
))
195 task
= rpc_wake_up_next(&xprt
->resend
);
197 task
= rpc_wake_up_next(&xprt
->sending
);
201 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
)) {
202 struct rpc_rqst
*req
= task
->tk_rqstp
;
203 xprt
->snd_task
= task
;
205 req
->rq_bytes_sent
= 0;
211 smp_mb__before_clear_bit();
212 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
213 smp_mb__after_clear_bit();
217 * Releases the socket for use by other requests.
220 __xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
222 if (xprt
->snd_task
== task
) {
223 xprt
->snd_task
= NULL
;
224 smp_mb__before_clear_bit();
225 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
226 smp_mb__after_clear_bit();
227 __xprt_lock_write_next(xprt
);
232 xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
234 spin_lock_bh(&xprt
->sock_lock
);
235 __xprt_release_write(xprt
, task
);
236 spin_unlock_bh(&xprt
->sock_lock
);
240 * Write data to socket.
243 xprt_sendmsg(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
245 struct socket
*sock
= xprt
->sock
;
246 struct xdr_buf
*xdr
= &req
->rq_snd_buf
;
247 struct sockaddr
*addr
= NULL
;
255 xprt_pktdump("packet data:",
256 req
->rq_svec
->iov_base
,
257 req
->rq_svec
->iov_len
);
259 /* For UDP, we need to provide an address */
261 addr
= (struct sockaddr
*) &xprt
->addr
;
262 addrlen
= sizeof(xprt
->addr
);
264 /* Dont repeat bytes */
265 skip
= req
->rq_bytes_sent
;
267 clear_bit(SOCK_ASYNC_NOSPACE
, &sock
->flags
);
268 result
= xdr_sendpages(sock
, addr
, addrlen
, xdr
, skip
, MSG_DONTWAIT
);
270 dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr
->len
- skip
, result
);
277 /* When the server has died, an ICMP port unreachable message
278 * prompts ECONNREFUSED.
285 /* connection broken */
290 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
296 * Van Jacobson congestion avoidance. Check if the congestion window
297 * overflowed. Put the task to sleep if this is the case.
300 __xprt_get_cong(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
302 struct rpc_rqst
*req
= task
->tk_rqstp
;
306 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
307 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
308 if (RPCXPRT_CONGESTED(xprt
))
311 xprt
->cong
+= RPC_CWNDSCALE
;
316 * Adjust the congestion window, and wake up the next task
317 * that has been sleeping due to congestion
320 __xprt_put_cong(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
325 xprt
->cong
-= RPC_CWNDSCALE
;
326 __xprt_lock_write_next(xprt
);
330 * Adjust RPC congestion window
331 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
334 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
339 if (result
>= 0 && cwnd
<= xprt
->cong
) {
340 /* The (cwnd >> 1) term makes sure
341 * the result gets rounded properly. */
342 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
343 if (cwnd
> RPC_MAXCWND(xprt
))
344 cwnd
= RPC_MAXCWND(xprt
);
345 __xprt_lock_write_next(xprt
);
346 } else if (result
== -ETIMEDOUT
) {
348 if (cwnd
< RPC_CWNDSCALE
)
349 cwnd
= RPC_CWNDSCALE
;
351 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
352 xprt
->cong
, xprt
->cwnd
, cwnd
);
357 * Reset the major timeout value
359 static void xprt_reset_majortimeo(struct rpc_rqst
*req
)
361 struct rpc_timeout
*to
= &req
->rq_xprt
->timeout
;
363 req
->rq_majortimeo
= req
->rq_timeout
;
364 if (to
->to_exponential
)
365 req
->rq_majortimeo
<<= to
->to_retries
;
367 req
->rq_majortimeo
+= to
->to_increment
* to
->to_retries
;
368 if (req
->rq_majortimeo
> to
->to_maxval
|| req
->rq_majortimeo
== 0)
369 req
->rq_majortimeo
= to
->to_maxval
;
370 req
->rq_majortimeo
+= jiffies
;
374 * Adjust timeout values etc for next retransmit
376 int xprt_adjust_timeout(struct rpc_rqst
*req
)
378 struct rpc_xprt
*xprt
= req
->rq_xprt
;
379 struct rpc_timeout
*to
= &xprt
->timeout
;
382 if (time_before(jiffies
, req
->rq_majortimeo
)) {
383 if (to
->to_exponential
)
384 req
->rq_timeout
<<= 1;
386 req
->rq_timeout
+= to
->to_increment
;
387 if (to
->to_maxval
&& req
->rq_timeout
>= to
->to_maxval
)
388 req
->rq_timeout
= to
->to_maxval
;
390 pprintk("RPC: %lu retrans\n", jiffies
);
392 req
->rq_timeout
= to
->to_initval
;
394 xprt_reset_majortimeo(req
);
395 /* Reset the RTT counters == "slow start" */
396 spin_lock_bh(&xprt
->sock_lock
);
397 rpc_init_rtt(req
->rq_task
->tk_client
->cl_rtt
, to
->to_initval
);
398 spin_unlock_bh(&xprt
->sock_lock
);
399 pprintk("RPC: %lu timeout\n", jiffies
);
403 if (req
->rq_timeout
== 0) {
404 printk(KERN_WARNING
"xprt_adjust_timeout: rq_timeout = 0!\n");
405 req
->rq_timeout
= 5 * HZ
;
411 * Close down a transport socket
414 xprt_close(struct rpc_xprt
*xprt
)
416 struct socket
*sock
= xprt
->sock
;
417 struct sock
*sk
= xprt
->inet
;
422 write_lock_bh(&sk
->sk_callback_lock
);
426 sk
->sk_user_data
= NULL
;
427 sk
->sk_data_ready
= xprt
->old_data_ready
;
428 sk
->sk_state_change
= xprt
->old_state_change
;
429 sk
->sk_write_space
= xprt
->old_write_space
;
430 write_unlock_bh(&sk
->sk_callback_lock
);
438 xprt_socket_autoclose(void *args
)
440 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)args
;
442 xprt_disconnect(xprt
);
444 xprt_release_write(xprt
, NULL
);
448 * Mark a transport as disconnected
451 xprt_disconnect(struct rpc_xprt
*xprt
)
453 dprintk("RPC: disconnected transport %p\n", xprt
);
454 spin_lock_bh(&xprt
->sock_lock
);
455 xprt_clear_connected(xprt
);
456 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
457 spin_unlock_bh(&xprt
->sock_lock
);
461 * Used to allow disconnection when we've been idle
464 xprt_init_autodisconnect(unsigned long data
)
466 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)data
;
468 spin_lock(&xprt
->sock_lock
);
469 if (!list_empty(&xprt
->recv
) || xprt
->shutdown
)
471 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
))
473 spin_unlock(&xprt
->sock_lock
);
474 /* Let keventd close the socket */
475 if (test_bit(XPRT_CONNECTING
, &xprt
->sockstate
) != 0)
476 xprt_release_write(xprt
, NULL
);
478 schedule_work(&xprt
->task_cleanup
);
481 spin_unlock(&xprt
->sock_lock
);
484 static void xprt_socket_connect(void *args
)
486 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)args
;
487 struct socket
*sock
= xprt
->sock
;
490 if (xprt
->shutdown
|| xprt
->addr
.sin_port
== 0)
494 * Start by resetting any existing state
497 sock
= xprt_create_socket(xprt
, xprt
->prot
, xprt
->resvport
);
499 /* couldn't create socket or bind to reserved port;
500 * this is likely a permanent error, so cause an abort */
503 xprt_bind_socket(xprt
, sock
);
504 xprt_sock_setbufsize(xprt
);
511 * Tell the socket layer to start connecting...
513 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
514 sizeof(xprt
->addr
), O_NONBLOCK
);
515 dprintk("RPC: %p connect status %d connected %d sock state %d\n",
516 xprt
, -status
, xprt_connected(xprt
), sock
->sk
->sk_state
);
526 rpc_wake_up_status(&xprt
->pending
, status
);
528 rpc_wake_up(&xprt
->pending
);
530 smp_mb__before_clear_bit();
531 clear_bit(XPRT_CONNECTING
, &xprt
->sockstate
);
532 smp_mb__after_clear_bit();
536 * Attempt to connect a TCP socket.
539 void xprt_connect(struct rpc_task
*task
)
541 struct rpc_xprt
*xprt
= task
->tk_xprt
;
543 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task
->tk_pid
,
544 xprt
, (xprt_connected(xprt
) ? "is" : "is not"));
546 if (xprt
->shutdown
) {
547 task
->tk_status
= -EIO
;
550 if (!xprt
->addr
.sin_port
) {
551 task
->tk_status
= -EIO
;
554 if (!xprt_lock_write(xprt
, task
))
556 if (xprt_connected(xprt
))
560 task
->tk_rqstp
->rq_bytes_sent
= 0;
562 task
->tk_timeout
= RPC_CONNECT_TIMEOUT
;
563 rpc_sleep_on(&xprt
->pending
, task
, xprt_connect_status
, NULL
);
564 if (!test_and_set_bit(XPRT_CONNECTING
, &xprt
->sockstate
)) {
565 /* Note: if we are here due to a dropped connection
566 * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
569 if (xprt
->sock
!= NULL
)
570 schedule_delayed_work(&xprt
->sock_connect
,
571 RPC_REESTABLISH_TIMEOUT
);
573 schedule_work(&xprt
->sock_connect
);
577 xprt_release_write(xprt
, task
);
581 * We arrive here when awoken from waiting on connection establishment.
584 xprt_connect_status(struct rpc_task
*task
)
586 struct rpc_xprt
*xprt
= task
->tk_xprt
;
588 if (task
->tk_status
>= 0) {
589 dprintk("RPC: %4d xprt_connect_status: connection established\n",
594 /* if soft mounted, just cause this RPC to fail */
595 if (RPC_IS_SOFT(task
))
596 task
->tk_status
= -EIO
;
598 switch (task
->tk_status
) {
604 dprintk("RPC: %4d xprt_connect_status: timed out\n",
608 printk(KERN_ERR
"RPC: error %d connecting to server %s\n",
609 -task
->tk_status
, task
->tk_client
->cl_server
);
611 xprt_release_write(xprt
, task
);
615 * Look up the RPC request corresponding to a reply, and then lock it.
617 static inline struct rpc_rqst
*
618 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
620 struct list_head
*pos
;
621 struct rpc_rqst
*req
= NULL
;
623 list_for_each(pos
, &xprt
->recv
) {
624 struct rpc_rqst
*entry
= list_entry(pos
, struct rpc_rqst
, rq_list
);
625 if (entry
->rq_xid
== xid
) {
634 * Complete reply received.
635 * The TCP code relies on us to remove the request from xprt->pending.
638 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
640 struct rpc_task
*task
= req
->rq_task
;
641 struct rpc_clnt
*clnt
= task
->tk_client
;
643 /* Adjust congestion window */
645 unsigned timer
= task
->tk_msg
.rpc_proc
->p_timer
;
646 xprt_adjust_cwnd(xprt
, copied
);
647 __xprt_put_cong(xprt
, req
);
649 if (req
->rq_ntrans
== 1)
650 rpc_update_rtt(clnt
->cl_rtt
, timer
,
651 (long)jiffies
- req
->rq_xtime
);
652 rpc_set_timeo(clnt
->cl_rtt
, timer
, req
->rq_ntrans
- 1);
657 /* Profile only reads for now */
659 static unsigned long nextstat
;
660 static unsigned long pkt_rtt
, pkt_len
, pkt_cnt
;
663 pkt_len
+= req
->rq_slen
+ copied
;
664 pkt_rtt
+= jiffies
- req
->rq_xtime
;
665 if (time_before(nextstat
, jiffies
)) {
666 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
667 printk("RPC: %ld %ld %ld %ld stat\n",
668 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
669 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
670 nextstat
= jiffies
+ 5 * HZ
;
675 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
676 list_del_init(&req
->rq_list
);
677 req
->rq_received
= req
->rq_private_buf
.len
= copied
;
679 /* ... and wake up the process. */
680 rpc_wake_up_task(task
);
685 skb_read_bits(skb_reader_t
*desc
, void *to
, size_t len
)
687 if (len
> desc
->count
)
689 if (skb_copy_bits(desc
->skb
, desc
->offset
, to
, len
))
697 skb_read_and_csum_bits(skb_reader_t
*desc
, void *to
, size_t len
)
699 unsigned int csum2
, pos
;
701 if (len
> desc
->count
)
704 csum2
= skb_copy_and_csum_bits(desc
->skb
, pos
, to
, len
, 0);
705 desc
->csum
= csum_block_add(desc
->csum
, csum2
, pos
);
712 * We have set things up such that we perform the checksum of the UDP
713 * packet in parallel with the copies into the RPC client iovec. -DaveM
716 csum_partial_copy_to_xdr(struct xdr_buf
*xdr
, struct sk_buff
*skb
)
721 desc
.offset
= sizeof(struct udphdr
);
722 desc
.count
= skb
->len
- desc
.offset
;
724 if (skb
->ip_summed
== CHECKSUM_UNNECESSARY
)
727 desc
.csum
= csum_partial(skb
->data
, desc
.offset
, skb
->csum
);
728 xdr_partial_copy_from_skb(xdr
, 0, &desc
, skb_read_and_csum_bits
);
729 if (desc
.offset
!= skb
->len
) {
731 csum2
= skb_checksum(skb
, desc
.offset
, skb
->len
- desc
.offset
, 0);
732 desc
.csum
= csum_block_add(desc
.csum
, csum2
, desc
.offset
);
736 if ((unsigned short)csum_fold(desc
.csum
))
740 xdr_partial_copy_from_skb(xdr
, 0, &desc
, skb_read_bits
);
747 * Input handler for RPC replies. Called from a bottom half and hence
751 udp_data_ready(struct sock
*sk
, int len
)
753 struct rpc_task
*task
;
754 struct rpc_xprt
*xprt
;
755 struct rpc_rqst
*rovr
;
757 int err
, repsize
, copied
;
760 read_lock(&sk
->sk_callback_lock
);
761 dprintk("RPC: udp_data_ready...\n");
762 if (!(xprt
= xprt_from_sock(sk
))) {
763 printk("RPC: udp_data_ready request not found!\n");
767 dprintk("RPC: udp_data_ready client %p\n", xprt
);
769 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
775 repsize
= skb
->len
- sizeof(struct udphdr
);
777 printk("RPC: impossible RPC reply size %d!\n", repsize
);
781 /* Copy the XID from the skb... */
782 xp
= skb_header_pointer(skb
, sizeof(struct udphdr
),
783 sizeof(_xid
), &_xid
);
787 /* Look up and lock the request corresponding to the given XID */
788 spin_lock(&xprt
->sock_lock
);
789 rovr
= xprt_lookup_rqst(xprt
, *xp
);
792 task
= rovr
->rq_task
;
794 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
796 if ((copied
= rovr
->rq_private_buf
.buflen
) > repsize
)
799 /* Suck it into the iovec, verify checksum if not done by hw. */
800 if (csum_partial_copy_to_xdr(&rovr
->rq_private_buf
, skb
))
803 /* Something worked... */
804 dst_confirm(skb
->dst
);
806 xprt_complete_rqst(xprt
, rovr
, copied
);
809 spin_unlock(&xprt
->sock_lock
);
811 skb_free_datagram(sk
, skb
);
813 read_unlock(&sk
->sk_callback_lock
);
817 * Copy from an skb into memory and shrink the skb.
820 tcp_copy_data(skb_reader_t
*desc
, void *p
, size_t len
)
822 if (len
> desc
->count
)
824 if (skb_copy_bits(desc
->skb
, desc
->offset
, p
, len
))
832 * TCP read fragment marker
835 tcp_read_fraghdr(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
840 p
= ((char *) &xprt
->tcp_recm
) + xprt
->tcp_offset
;
841 len
= sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
842 used
= tcp_copy_data(desc
, p
, len
);
843 xprt
->tcp_offset
+= used
;
846 xprt
->tcp_reclen
= ntohl(xprt
->tcp_recm
);
847 if (xprt
->tcp_reclen
& 0x80000000)
848 xprt
->tcp_flags
|= XPRT_LAST_FRAG
;
850 xprt
->tcp_flags
&= ~XPRT_LAST_FRAG
;
851 xprt
->tcp_reclen
&= 0x7fffffff;
852 xprt
->tcp_flags
&= ~XPRT_COPY_RECM
;
853 xprt
->tcp_offset
= 0;
854 /* Sanity check of the record length */
855 if (xprt
->tcp_reclen
< 4) {
856 printk(KERN_ERR
"RPC: Invalid TCP record fragment length\n");
857 xprt_disconnect(xprt
);
859 dprintk("RPC: reading TCP record fragment of length %d\n",
864 tcp_check_recm(struct rpc_xprt
*xprt
)
866 if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
867 xprt
->tcp_flags
|= XPRT_COPY_RECM
;
868 xprt
->tcp_offset
= 0;
869 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
) {
870 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
871 xprt
->tcp_flags
|= XPRT_COPY_XID
;
872 xprt
->tcp_copied
= 0;
881 tcp_read_xid(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
886 len
= sizeof(xprt
->tcp_xid
) - xprt
->tcp_offset
;
887 dprintk("RPC: reading XID (%Zu bytes)\n", len
);
888 p
= ((char *) &xprt
->tcp_xid
) + xprt
->tcp_offset
;
889 used
= tcp_copy_data(desc
, p
, len
);
890 xprt
->tcp_offset
+= used
;
893 xprt
->tcp_flags
&= ~XPRT_COPY_XID
;
894 xprt
->tcp_flags
|= XPRT_COPY_DATA
;
895 xprt
->tcp_copied
= 4;
896 dprintk("RPC: reading reply for XID %08x\n",
897 ntohl(xprt
->tcp_xid
));
898 tcp_check_recm(xprt
);
902 * TCP read and complete request
905 tcp_read_request(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
907 struct rpc_rqst
*req
;
908 struct xdr_buf
*rcvbuf
;
911 /* Find and lock the request corresponding to this xid */
912 spin_lock(&xprt
->sock_lock
);
913 req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
);
915 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
916 dprintk("RPC: XID %08x request not found!\n",
917 ntohl(xprt
->tcp_xid
));
918 spin_unlock(&xprt
->sock_lock
);
922 rcvbuf
= &req
->rq_private_buf
;
924 if (len
> xprt
->tcp_reclen
- xprt
->tcp_offset
) {
925 skb_reader_t my_desc
;
927 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
928 memcpy(&my_desc
, desc
, sizeof(my_desc
));
930 xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
931 &my_desc
, tcp_copy_data
);
935 xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
936 desc
, tcp_copy_data
);
937 xprt
->tcp_copied
+= len
;
938 xprt
->tcp_offset
+= len
;
940 if (xprt
->tcp_copied
== req
->rq_private_buf
.buflen
)
941 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
942 else if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
943 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
)
944 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
947 if (!(xprt
->tcp_flags
& XPRT_COPY_DATA
)) {
948 dprintk("RPC: %4d received reply complete\n",
949 req
->rq_task
->tk_pid
);
950 xprt_complete_rqst(xprt
, req
, xprt
->tcp_copied
);
952 spin_unlock(&xprt
->sock_lock
);
953 tcp_check_recm(xprt
);
957 * TCP discard extra bytes from a short read
960 tcp_read_discard(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
964 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
965 if (len
> desc
->count
)
969 xprt
->tcp_offset
+= len
;
970 tcp_check_recm(xprt
);
974 * TCP record receive routine
975 * We first have to grab the record marker, then the XID, then the data.
978 tcp_data_recv(read_descriptor_t
*rd_desc
, struct sk_buff
*skb
,
979 unsigned int offset
, size_t len
)
981 struct rpc_xprt
*xprt
= rd_desc
->arg
.data
;
982 skb_reader_t desc
= {
989 dprintk("RPC: tcp_data_recv\n");
991 /* Read in a new fragment marker if necessary */
992 /* Can we ever really expect to get completely empty fragments? */
993 if (xprt
->tcp_flags
& XPRT_COPY_RECM
) {
994 tcp_read_fraghdr(xprt
, &desc
);
997 /* Read in the xid if necessary */
998 if (xprt
->tcp_flags
& XPRT_COPY_XID
) {
999 tcp_read_xid(xprt
, &desc
);
1002 /* Read in the request data */
1003 if (xprt
->tcp_flags
& XPRT_COPY_DATA
) {
1004 tcp_read_request(xprt
, &desc
);
1007 /* Skip over any trailing bytes on short reads */
1008 tcp_read_discard(xprt
, &desc
);
1009 } while (desc
.count
);
1010 dprintk("RPC: tcp_data_recv done\n");
1011 return len
- desc
.count
;
1014 static void tcp_data_ready(struct sock
*sk
, int bytes
)
1016 struct rpc_xprt
*xprt
;
1017 read_descriptor_t rd_desc
;
1019 read_lock(&sk
->sk_callback_lock
);
1020 dprintk("RPC: tcp_data_ready...\n");
1021 if (!(xprt
= xprt_from_sock(sk
))) {
1022 printk("RPC: tcp_data_ready socket info not found!\n");
1028 /* We use rd_desc to pass struct xprt to tcp_data_recv */
1029 rd_desc
.arg
.data
= xprt
;
1030 rd_desc
.count
= 65536;
1031 tcp_read_sock(sk
, &rd_desc
, tcp_data_recv
);
1033 read_unlock(&sk
->sk_callback_lock
);
1037 tcp_state_change(struct sock
*sk
)
1039 struct rpc_xprt
*xprt
;
1041 read_lock(&sk
->sk_callback_lock
);
1042 if (!(xprt
= xprt_from_sock(sk
)))
1044 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
1045 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1046 sk
->sk_state
, xprt_connected(xprt
),
1047 sock_flag(sk
, SOCK_DEAD
),
1048 sock_flag(sk
, SOCK_ZAPPED
));
1050 switch (sk
->sk_state
) {
1051 case TCP_ESTABLISHED
:
1052 spin_lock_bh(&xprt
->sock_lock
);
1053 if (!xprt_test_and_set_connected(xprt
)) {
1054 /* Reset TCP record info */
1055 xprt
->tcp_offset
= 0;
1056 xprt
->tcp_reclen
= 0;
1057 xprt
->tcp_copied
= 0;
1058 xprt
->tcp_flags
= XPRT_COPY_RECM
| XPRT_COPY_XID
;
1059 rpc_wake_up(&xprt
->pending
);
1061 spin_unlock_bh(&xprt
->sock_lock
);
1067 if (xprt_test_and_clear_connected(xprt
))
1068 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
1072 read_unlock(&sk
->sk_callback_lock
);
1076 * Called when more output buffer space is available for this socket.
1077 * We try not to wake our writers until they can make "significant"
1078 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1079 * with a bunch of small requests.
1082 xprt_write_space(struct sock
*sk
)
1084 struct rpc_xprt
*xprt
;
1085 struct socket
*sock
;
1087 read_lock(&sk
->sk_callback_lock
);
1088 if (!(xprt
= xprt_from_sock(sk
)) || !(sock
= sk
->sk_socket
))
1093 /* Wait until we have enough socket memory */
1095 /* from net/core/stream.c:sk_stream_write_space */
1096 if (sk_stream_wspace(sk
) < sk_stream_min_wspace(sk
))
1099 /* from net/core/sock.c:sock_def_write_space */
1100 if (!sock_writeable(sk
))
1104 if (!test_and_clear_bit(SOCK_NOSPACE
, &sock
->flags
))
1107 spin_lock_bh(&xprt
->sock_lock
);
1109 rpc_wake_up_task(xprt
->snd_task
);
1110 spin_unlock_bh(&xprt
->sock_lock
);
1112 read_unlock(&sk
->sk_callback_lock
);
1116 * RPC receive timeout handler.
1119 xprt_timer(struct rpc_task
*task
)
1121 struct rpc_rqst
*req
= task
->tk_rqstp
;
1122 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1124 spin_lock(&xprt
->sock_lock
);
1125 if (req
->rq_received
)
1128 xprt_adjust_cwnd(req
->rq_xprt
, -ETIMEDOUT
);
1129 __xprt_put_cong(xprt
, req
);
1131 dprintk("RPC: %4d xprt_timer (%s request)\n",
1132 task
->tk_pid
, req
? "pending" : "backlogged");
1134 task
->tk_status
= -ETIMEDOUT
;
1136 task
->tk_timeout
= 0;
1137 rpc_wake_up_task(task
);
1138 spin_unlock(&xprt
->sock_lock
);
1142 * Place the actual RPC call.
1143 * We have to copy the iovec because sendmsg fiddles with its contents.
1146 xprt_prepare_transmit(struct rpc_task
*task
)
1148 struct rpc_rqst
*req
= task
->tk_rqstp
;
1149 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1152 dprintk("RPC: %4d xprt_prepare_transmit\n", task
->tk_pid
);
1157 spin_lock_bh(&xprt
->sock_lock
);
1158 if (req
->rq_received
&& !req
->rq_bytes_sent
) {
1159 err
= req
->rq_received
;
1162 if (!__xprt_lock_write(xprt
, task
)) {
1167 if (!xprt_connected(xprt
)) {
1172 spin_unlock_bh(&xprt
->sock_lock
);
1177 xprt_transmit(struct rpc_task
*task
)
1179 struct rpc_clnt
*clnt
= task
->tk_client
;
1180 struct rpc_rqst
*req
= task
->tk_rqstp
;
1181 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1182 int status
, retry
= 0;
1185 dprintk("RPC: %4d xprt_transmit(%u)\n", task
->tk_pid
, req
->rq_slen
);
1187 /* set up everything as needed. */
1188 /* Write the record marker */
1190 u32
*marker
= req
->rq_svec
[0].iov_base
;
1192 *marker
= htonl(0x80000000|(req
->rq_slen
-sizeof(*marker
)));
1196 if (!req
->rq_received
) {
1197 if (list_empty(&req
->rq_list
)) {
1198 spin_lock_bh(&xprt
->sock_lock
);
1199 /* Update the softirq receive buffer */
1200 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
,
1201 sizeof(req
->rq_private_buf
));
1202 /* Add request to the receive list */
1203 list_add_tail(&req
->rq_list
, &xprt
->recv
);
1204 spin_unlock_bh(&xprt
->sock_lock
);
1205 xprt_reset_majortimeo(req
);
1207 } else if (!req
->rq_bytes_sent
)
1210 /* Continue transmitting the packet/record. We must be careful
1211 * to cope with writespace callbacks arriving _after_ we have
1212 * called xprt_sendmsg().
1215 req
->rq_xtime
= jiffies
;
1216 status
= xprt_sendmsg(xprt
, req
);
1222 req
->rq_bytes_sent
+= status
;
1224 /* If we've sent the entire packet, immediately
1225 * reset the count of bytes sent. */
1226 if (req
->rq_bytes_sent
>= req
->rq_slen
) {
1227 req
->rq_bytes_sent
= 0;
1231 if (status
>= req
->rq_slen
)
1237 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1238 task
->tk_pid
, req
->rq_slen
- req
->rq_bytes_sent
,
1246 /* Note: at this point, task->tk_sleeping has not yet been set,
1247 * hence there is no danger of the waking up task being put on
1248 * schedq, and being picked up by a parallel run of rpciod().
1250 task
->tk_status
= status
;
1254 if (test_bit(SOCK_ASYNC_NOSPACE
, &xprt
->sock
->flags
)) {
1255 /* Protect against races with xprt_write_space */
1256 spin_lock_bh(&xprt
->sock_lock
);
1257 /* Don't race with disconnect */
1258 if (!xprt_connected(xprt
))
1259 task
->tk_status
= -ENOTCONN
;
1260 else if (test_bit(SOCK_NOSPACE
, &xprt
->sock
->flags
)) {
1261 task
->tk_timeout
= req
->rq_timeout
;
1262 rpc_sleep_on(&xprt
->pending
, task
, NULL
, NULL
);
1264 spin_unlock_bh(&xprt
->sock_lock
);
1267 /* Keep holding the socket if it is blocked */
1268 rpc_delay(task
, HZ
>>4);
1271 task
->tk_timeout
= RPC_REESTABLISH_TIMEOUT
;
1272 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1277 xprt_disconnect(xprt
);
1279 xprt_release_write(xprt
, task
);
1282 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1283 /* Set the task's receive timeout value */
1284 spin_lock_bh(&xprt
->sock_lock
);
1285 if (!xprt
->nocong
) {
1286 int timer
= task
->tk_msg
.rpc_proc
->p_timer
;
1287 task
->tk_timeout
= rpc_calc_rto(clnt
->cl_rtt
, timer
);
1288 task
->tk_timeout
<<= rpc_ntimeo(clnt
->cl_rtt
, timer
) + req
->rq_retries
;
1289 if (task
->tk_timeout
> xprt
->timeout
.to_maxval
|| task
->tk_timeout
== 0)
1290 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
1292 task
->tk_timeout
= req
->rq_timeout
;
1293 /* Don't race with disconnect */
1294 if (!xprt_connected(xprt
))
1295 task
->tk_status
= -ENOTCONN
;
1296 else if (!req
->rq_received
)
1297 rpc_sleep_on(&xprt
->pending
, task
, NULL
, xprt_timer
);
1298 __xprt_release_write(xprt
, task
);
1299 spin_unlock_bh(&xprt
->sock_lock
);
1303 * Reserve an RPC call slot.
1306 do_xprt_reserve(struct rpc_task
*task
)
1308 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1310 task
->tk_status
= 0;
1313 if (!list_empty(&xprt
->free
)) {
1314 struct rpc_rqst
*req
= list_entry(xprt
->free
.next
, struct rpc_rqst
, rq_list
);
1315 list_del_init(&req
->rq_list
);
1316 task
->tk_rqstp
= req
;
1317 xprt_request_init(task
, xprt
);
1320 dprintk("RPC: waiting for request slot\n");
1321 task
->tk_status
= -EAGAIN
;
1322 task
->tk_timeout
= 0;
1323 rpc_sleep_on(&xprt
->backlog
, task
, NULL
, NULL
);
1327 xprt_reserve(struct rpc_task
*task
)
1329 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1331 task
->tk_status
= -EIO
;
1332 if (!xprt
->shutdown
) {
1333 spin_lock(&xprt
->xprt_lock
);
1334 do_xprt_reserve(task
);
1335 spin_unlock(&xprt
->xprt_lock
);
1337 del_timer_sync(&xprt
->timer
);
1342 * Allocate a 'unique' XID
1344 static inline u32
xprt_alloc_xid(struct rpc_xprt
*xprt
)
1349 static inline void xprt_init_xid(struct rpc_xprt
*xprt
)
1351 get_random_bytes(&xprt
->xid
, sizeof(xprt
->xid
));
1355 * Initialize RPC request
1358 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1360 struct rpc_rqst
*req
= task
->tk_rqstp
;
1362 req
->rq_timeout
= xprt
->timeout
.to_initval
;
1363 req
->rq_task
= task
;
1364 req
->rq_xprt
= xprt
;
1365 req
->rq_xid
= xprt_alloc_xid(xprt
);
1366 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
,
1367 req
, ntohl(req
->rq_xid
));
1371 * Release an RPC call slot
1374 xprt_release(struct rpc_task
*task
)
1376 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1377 struct rpc_rqst
*req
;
1379 if (!(req
= task
->tk_rqstp
))
1381 spin_lock_bh(&xprt
->sock_lock
);
1382 __xprt_release_write(xprt
, task
);
1383 __xprt_put_cong(xprt
, req
);
1384 if (!list_empty(&req
->rq_list
))
1385 list_del(&req
->rq_list
);
1386 xprt
->last_used
= jiffies
;
1387 if (list_empty(&xprt
->recv
) && !xprt
->shutdown
)
1388 mod_timer(&xprt
->timer
, xprt
->last_used
+ XPRT_IDLE_TIMEOUT
);
1389 spin_unlock_bh(&xprt
->sock_lock
);
1390 task
->tk_rqstp
= NULL
;
1391 memset(req
, 0, sizeof(*req
)); /* mark unused */
1393 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1395 spin_lock(&xprt
->xprt_lock
);
1396 list_add(&req
->rq_list
, &xprt
->free
);
1397 xprt_clear_backlog(xprt
);
1398 spin_unlock(&xprt
->xprt_lock
);
1402 * Set default timeout parameters
1405 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1407 if (proto
== IPPROTO_UDP
)
1408 xprt_set_timeout(to
, 5, 5 * HZ
);
1410 xprt_set_timeout(to
, 5, 60 * HZ
);
1414 * Set constant timeout
1417 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1420 to
->to_increment
= incr
;
1421 to
->to_maxval
= incr
* retr
;
1422 to
->to_retries
= retr
;
1423 to
->to_exponential
= 0;
1426 unsigned int xprt_udp_slot_table_entries
= RPC_DEF_SLOT_TABLE
;
1427 unsigned int xprt_tcp_slot_table_entries
= RPC_DEF_SLOT_TABLE
;
1430 * Initialize an RPC client
1432 static struct rpc_xprt
*
1433 xprt_setup(int proto
, struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1435 struct rpc_xprt
*xprt
;
1436 unsigned int entries
;
1437 size_t slot_table_size
;
1438 struct rpc_rqst
*req
;
1440 dprintk("RPC: setting up %s transport...\n",
1441 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1443 entries
= (proto
== IPPROTO_TCP
)?
1444 xprt_tcp_slot_table_entries
: xprt_udp_slot_table_entries
;
1446 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1447 return ERR_PTR(-ENOMEM
);
1448 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1449 xprt
->max_reqs
= entries
;
1450 slot_table_size
= entries
* sizeof(xprt
->slot
[0]);
1451 xprt
->slot
= kmalloc(slot_table_size
, GFP_KERNEL
);
1452 if (xprt
->slot
== NULL
) {
1454 return ERR_PTR(-ENOMEM
);
1456 memset(xprt
->slot
, 0, slot_table_size
);
1460 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1462 xprt
->cwnd
= RPC_MAXCWND(xprt
);
1464 xprt
->max_payload
= (1U << 31) - 1;
1466 xprt
->cwnd
= RPC_INITCWND
;
1467 xprt
->max_payload
= (1U << 16) - (MAX_HEADER
<< 3);
1469 spin_lock_init(&xprt
->sock_lock
);
1470 spin_lock_init(&xprt
->xprt_lock
);
1471 init_waitqueue_head(&xprt
->cong_wait
);
1473 INIT_LIST_HEAD(&xprt
->free
);
1474 INIT_LIST_HEAD(&xprt
->recv
);
1475 INIT_WORK(&xprt
->sock_connect
, xprt_socket_connect
, xprt
);
1476 INIT_WORK(&xprt
->task_cleanup
, xprt_socket_autoclose
, xprt
);
1477 init_timer(&xprt
->timer
);
1478 xprt
->timer
.function
= xprt_init_autodisconnect
;
1479 xprt
->timer
.data
= (unsigned long) xprt
;
1480 xprt
->last_used
= jiffies
;
1481 xprt
->port
= XPRT_MAX_RESVPORT
;
1483 /* Set timeout parameters */
1485 xprt
->timeout
= *to
;
1487 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1489 rpc_init_wait_queue(&xprt
->pending
, "xprt_pending");
1490 rpc_init_wait_queue(&xprt
->sending
, "xprt_sending");
1491 rpc_init_wait_queue(&xprt
->resend
, "xprt_resend");
1492 rpc_init_priority_wait_queue(&xprt
->backlog
, "xprt_backlog");
1494 /* initialize free list */
1495 for (req
= &xprt
->slot
[entries
-1]; req
>= &xprt
->slot
[0]; req
--)
1496 list_add(&req
->rq_list
, &xprt
->free
);
1498 xprt_init_xid(xprt
);
1500 /* Check whether we want to use a reserved port */
1501 xprt
->resvport
= capable(CAP_NET_BIND_SERVICE
) ? 1 : 0;
1503 dprintk("RPC: created transport %p with %u slots\n", xprt
,
1510 * Bind to a reserved port
1512 static inline int xprt_bindresvport(struct rpc_xprt
*xprt
, struct socket
*sock
)
1514 struct sockaddr_in myaddr
= {
1515 .sin_family
= AF_INET
,
1519 /* Were we already bound to a given port? Try to reuse it */
1522 myaddr
.sin_port
= htons(port
);
1523 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1530 port
= XPRT_MAX_RESVPORT
;
1531 } while (err
== -EADDRINUSE
&& port
!= xprt
->port
);
1533 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1538 xprt_bind_socket(struct rpc_xprt
*xprt
, struct socket
*sock
)
1540 struct sock
*sk
= sock
->sk
;
1545 write_lock_bh(&sk
->sk_callback_lock
);
1546 sk
->sk_user_data
= xprt
;
1547 xprt
->old_data_ready
= sk
->sk_data_ready
;
1548 xprt
->old_state_change
= sk
->sk_state_change
;
1549 xprt
->old_write_space
= sk
->sk_write_space
;
1550 if (xprt
->prot
== IPPROTO_UDP
) {
1551 sk
->sk_data_ready
= udp_data_ready
;
1552 sk
->sk_no_check
= UDP_CSUM_NORCV
;
1553 xprt_set_connected(xprt
);
1555 tcp_sk(sk
)->nonagle
= 1; /* disable Nagle's algorithm */
1556 sk
->sk_data_ready
= tcp_data_ready
;
1557 sk
->sk_state_change
= tcp_state_change
;
1558 xprt_clear_connected(xprt
);
1560 sk
->sk_write_space
= xprt_write_space
;
1562 /* Reset to new socket */
1565 write_unlock_bh(&sk
->sk_callback_lock
);
1571 * Set socket buffer length
1574 xprt_sock_setbufsize(struct rpc_xprt
*xprt
)
1576 struct sock
*sk
= xprt
->inet
;
1580 if (xprt
->rcvsize
) {
1581 sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
1582 sk
->sk_rcvbuf
= xprt
->rcvsize
* xprt
->max_reqs
* 2;
1584 if (xprt
->sndsize
) {
1585 sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
1586 sk
->sk_sndbuf
= xprt
->sndsize
* xprt
->max_reqs
* 2;
1587 sk
->sk_write_space(sk
);
1592 * Datastream sockets are created here, but xprt_connect will create
1593 * and connect stream sockets.
1595 static struct socket
* xprt_create_socket(struct rpc_xprt
*xprt
, int proto
, int resvport
)
1597 struct socket
*sock
;
1600 dprintk("RPC: xprt_create_socket(%s %d)\n",
1601 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1603 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1605 if ((err
= sock_create_kern(PF_INET
, type
, proto
, &sock
)) < 0) {
1606 printk("RPC: can't create socket (%d).\n", -err
);
1610 /* If the caller has the capability, bind to a reserved port */
1611 if (resvport
&& xprt_bindresvport(xprt
, sock
) < 0) {
1612 printk("RPC: can't bind to reserved port.\n");
1624 * Create an RPC client transport given the protocol and peer address.
1627 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1629 struct rpc_xprt
*xprt
;
1631 xprt
= xprt_setup(proto
, sap
, to
);
1633 dprintk("RPC: xprt_create_proto failed\n");
1635 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt
);
1640 * Prepare for transport shutdown.
1643 xprt_shutdown(struct rpc_xprt
*xprt
)
1646 rpc_wake_up(&xprt
->sending
);
1647 rpc_wake_up(&xprt
->resend
);
1648 rpc_wake_up(&xprt
->pending
);
1649 rpc_wake_up(&xprt
->backlog
);
1650 wake_up(&xprt
->cong_wait
);
1651 del_timer_sync(&xprt
->timer
);
1655 * Clear the xprt backlog queue
1658 xprt_clear_backlog(struct rpc_xprt
*xprt
) {
1659 rpc_wake_up_next(&xprt
->backlog
);
1660 wake_up(&xprt
->cong_wait
);
1665 * Destroy an RPC transport, killing off all requests.
1668 xprt_destroy(struct rpc_xprt
*xprt
)
1670 dprintk("RPC: destroying transport %p\n", xprt
);
1671 xprt_shutdown(xprt
);
1672 xprt_disconnect(xprt
);