1 /* call.c: Rx call routines
3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
12 #include <linux/sched.h>
13 #include <linux/slab.h>
14 #include <linux/module.h>
15 #include <rxrpc/rxrpc.h>
16 #include <rxrpc/transport.h>
17 #include <rxrpc/peer.h>
18 #include <rxrpc/connection.h>
19 #include <rxrpc/call.h>
20 #include <rxrpc/message.h>
23 __RXACCT_DECL(atomic_t rxrpc_call_count
);
24 __RXACCT_DECL(atomic_t rxrpc_message_count
);
26 LIST_HEAD(rxrpc_calls
);
27 DECLARE_RWSEM(rxrpc_calls_sem
);
29 unsigned rxrpc_call_rcv_timeout
= HZ
/3;
30 static unsigned rxrpc_call_acks_timeout
= HZ
/3;
31 static unsigned rxrpc_call_dfr_ack_timeout
= HZ
/20;
32 static unsigned short rxrpc_call_max_resend
= HZ
/10;
34 const char *rxrpc_call_states
[] = {
47 const char *rxrpc_call_error_states
[] = {
55 const char *rxrpc_pkts
[] = {
57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug",
58 "?09", "?10", "?11", "?12", "?13", "?14", "?15"
61 static const char *rxrpc_acks
[] = {
62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
66 static const char _acktype
[] = "NA-";
68 static void rxrpc_call_receive_packet(struct rxrpc_call
*call
);
69 static void rxrpc_call_receive_data_packet(struct rxrpc_call
*call
,
70 struct rxrpc_message
*msg
);
71 static void rxrpc_call_receive_ack_packet(struct rxrpc_call
*call
,
72 struct rxrpc_message
*msg
);
73 static void rxrpc_call_definitively_ACK(struct rxrpc_call
*call
,
75 static void rxrpc_call_resend(struct rxrpc_call
*call
, rxrpc_seq_t highest
);
76 static int __rxrpc_call_read_data(struct rxrpc_call
*call
);
78 static int rxrpc_call_record_ACK(struct rxrpc_call
*call
,
79 struct rxrpc_message
*msg
,
83 static int rxrpc_call_flush(struct rxrpc_call
*call
);
85 #define _state(call) \
86 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);
88 static void rxrpc_call_default_attn_func(struct rxrpc_call
*call
)
90 wake_up(&call
->waitq
);
93 static void rxrpc_call_default_error_func(struct rxrpc_call
*call
)
95 wake_up(&call
->waitq
);
98 static void rxrpc_call_default_aemap_func(struct rxrpc_call
*call
)
100 switch (call
->app_err_state
) {
101 case RXRPC_ESTATE_LOCAL_ABORT
:
102 call
->app_abort_code
= -call
->app_errno
;
103 case RXRPC_ESTATE_PEER_ABORT
:
104 call
->app_errno
= -ECONNABORTED
;
110 static void __rxrpc_call_acks_timeout(unsigned long _call
)
112 struct rxrpc_call
*call
= (struct rxrpc_call
*) _call
;
114 _debug("ACKS TIMEOUT %05lu", jiffies
- call
->cjif
);
116 call
->flags
|= RXRPC_CALL_ACKS_TIMO
;
117 rxrpc_krxiod_queue_call(call
);
120 static void __rxrpc_call_rcv_timeout(unsigned long _call
)
122 struct rxrpc_call
*call
= (struct rxrpc_call
*) _call
;
124 _debug("RCV TIMEOUT %05lu", jiffies
- call
->cjif
);
126 call
->flags
|= RXRPC_CALL_RCV_TIMO
;
127 rxrpc_krxiod_queue_call(call
);
130 static void __rxrpc_call_ackr_timeout(unsigned long _call
)
132 struct rxrpc_call
*call
= (struct rxrpc_call
*) _call
;
134 _debug("ACKR TIMEOUT %05lu",jiffies
- call
->cjif
);
136 call
->flags
|= RXRPC_CALL_ACKR_TIMO
;
137 rxrpc_krxiod_queue_call(call
);
140 /*****************************************************************************/
142 * calculate a timeout based on an RTT value
144 static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call
*call
,
147 unsigned long expiry
= call
->conn
->peer
->rtt
/ (1000000 / HZ
);
150 if (expiry
< HZ
/ 25)
155 _leave(" = %lu jiffies", expiry
);
156 return jiffies
+ expiry
;
157 } /* end __rxrpc_rtt_based_timeout() */
159 /*****************************************************************************/
161 * create a new call record
163 static inline int __rxrpc_create_call(struct rxrpc_connection
*conn
,
164 struct rxrpc_call
**_call
)
166 struct rxrpc_call
*call
;
170 /* allocate and initialise a call record */
171 call
= (struct rxrpc_call
*) get_zeroed_page(GFP_KERNEL
);
177 atomic_set(&call
->usage
, 1);
179 init_waitqueue_head(&call
->waitq
);
180 spin_lock_init(&call
->lock
);
181 INIT_LIST_HEAD(&call
->link
);
182 INIT_LIST_HEAD(&call
->acks_pendq
);
183 INIT_LIST_HEAD(&call
->rcv_receiveq
);
184 INIT_LIST_HEAD(&call
->rcv_krxiodq_lk
);
185 INIT_LIST_HEAD(&call
->app_readyq
);
186 INIT_LIST_HEAD(&call
->app_unreadyq
);
187 INIT_LIST_HEAD(&call
->app_link
);
188 INIT_LIST_HEAD(&call
->app_attn_link
);
190 init_timer(&call
->acks_timeout
);
191 call
->acks_timeout
.data
= (unsigned long) call
;
192 call
->acks_timeout
.function
= __rxrpc_call_acks_timeout
;
194 init_timer(&call
->rcv_timeout
);
195 call
->rcv_timeout
.data
= (unsigned long) call
;
196 call
->rcv_timeout
.function
= __rxrpc_call_rcv_timeout
;
198 init_timer(&call
->ackr_dfr_timo
);
199 call
->ackr_dfr_timo
.data
= (unsigned long) call
;
200 call
->ackr_dfr_timo
.function
= __rxrpc_call_ackr_timeout
;
203 call
->ackr_win_bot
= 1;
204 call
->ackr_win_top
= call
->ackr_win_bot
+ RXRPC_CALL_ACK_WINDOW_SIZE
- 1;
205 call
->ackr_prev_seq
= 0;
206 call
->app_mark
= RXRPC_APP_MARK_EOF
;
207 call
->app_attn_func
= rxrpc_call_default_attn_func
;
208 call
->app_error_func
= rxrpc_call_default_error_func
;
209 call
->app_aemap_func
= rxrpc_call_default_aemap_func
;
210 call
->app_scr_alloc
= call
->app_scratch
;
212 call
->cjif
= jiffies
;
214 _leave(" = 0 (%p)", call
);
219 } /* end __rxrpc_create_call() */
221 /*****************************************************************************/
223 * create a new call record for outgoing calls
225 int rxrpc_create_call(struct rxrpc_connection
*conn
,
226 rxrpc_call_attn_func_t attn
,
227 rxrpc_call_error_func_t error
,
228 rxrpc_call_aemap_func_t aemap
,
229 struct rxrpc_call
**_call
)
231 DECLARE_WAITQUEUE(myself
, current
);
233 struct rxrpc_call
*call
;
238 /* allocate and initialise a call record */
239 ret
= __rxrpc_create_call(conn
, &call
);
241 _leave(" = %d", ret
);
245 call
->app_call_state
= RXRPC_CSTATE_CLNT_SND_ARGS
;
247 call
->app_attn_func
= attn
;
249 call
->app_error_func
= error
;
251 call
->app_aemap_func
= aemap
;
255 spin_lock(&conn
->lock
);
256 set_current_state(TASK_INTERRUPTIBLE
);
257 add_wait_queue(&conn
->chanwait
, &myself
);
260 /* try to find an unused channel */
261 for (cix
= 0; cix
< 4; cix
++)
262 if (!conn
->channels
[cix
])
265 /* no free channels - wait for one to become available */
267 if (signal_pending(current
))
270 spin_unlock(&conn
->lock
);
273 set_current_state(TASK_INTERRUPTIBLE
);
275 spin_lock(&conn
->lock
);
278 /* got a channel - now attach to the connection */
280 remove_wait_queue(&conn
->chanwait
, &myself
);
281 set_current_state(TASK_RUNNING
);
283 /* concoct a unique call number */
285 call
->call_id
= htonl(++conn
->call_counter
);
286 for (loop
= 0; loop
< 4; loop
++)
287 if (conn
->channels
[loop
] &&
288 conn
->channels
[loop
]->call_id
== call
->call_id
)
291 rxrpc_get_connection(conn
);
292 conn
->channels
[cix
] = call
; /* assign _after_ done callid check loop */
293 do_gettimeofday(&conn
->atime
);
294 call
->chan_ix
= htonl(cix
);
296 spin_unlock(&conn
->lock
);
298 down_write(&rxrpc_calls_sem
);
299 list_add_tail(&call
->call_link
, &rxrpc_calls
);
300 up_write(&rxrpc_calls_sem
);
302 __RXACCT(atomic_inc(&rxrpc_call_count
));
305 _leave(" = 0 (call=%p cix=%u)", call
, cix
);
309 remove_wait_queue(&conn
->chanwait
, &myself
);
310 set_current_state(TASK_RUNNING
);
311 spin_unlock(&conn
->lock
);
313 free_page((unsigned long) call
);
314 _leave(" = %d", ret
);
316 } /* end rxrpc_create_call() */
318 /*****************************************************************************/
320 * create a new call record for incoming calls
322 int rxrpc_incoming_call(struct rxrpc_connection
*conn
,
323 struct rxrpc_message
*msg
,
324 struct rxrpc_call
**_call
)
326 struct rxrpc_call
*call
;
330 cix
= ntohl(msg
->hdr
.cid
) & RXRPC_CHANNELMASK
;
332 _enter("%p,%u,%u", conn
, ntohl(msg
->hdr
.callNumber
), cix
);
334 /* allocate and initialise a call record */
335 ret
= __rxrpc_create_call(conn
, &call
);
337 _leave(" = %d", ret
);
341 call
->pkt_rcv_count
= 1;
342 call
->app_call_state
= RXRPC_CSTATE_SRVR_RCV_OPID
;
343 call
->app_mark
= sizeof(uint32_t);
347 /* attach to the connection */
349 call
->chan_ix
= htonl(cix
);
350 call
->call_id
= msg
->hdr
.callNumber
;
352 spin_lock(&conn
->lock
);
354 if (!conn
->channels
[cix
] ||
355 conn
->channels
[cix
]->app_call_state
== RXRPC_CSTATE_COMPLETE
||
356 conn
->channels
[cix
]->app_call_state
== RXRPC_CSTATE_ERROR
358 conn
->channels
[cix
] = call
;
359 rxrpc_get_connection(conn
);
363 spin_unlock(&conn
->lock
);
366 free_page((unsigned long) call
);
371 down_write(&rxrpc_calls_sem
);
372 list_add_tail(&call
->call_link
, &rxrpc_calls
);
373 up_write(&rxrpc_calls_sem
);
374 __RXACCT(atomic_inc(&rxrpc_call_count
));
378 _leave(" = %d [%p]", ret
, call
);
380 } /* end rxrpc_incoming_call() */
382 /*****************************************************************************/
386 void rxrpc_put_call(struct rxrpc_call
*call
)
388 struct rxrpc_connection
*conn
= call
->conn
;
389 struct rxrpc_message
*msg
;
391 _enter("%p{u=%d}",call
,atomic_read(&call
->usage
));
394 if (atomic_read(&call
->usage
) <= 0)
397 /* to prevent a race, the decrement and the de-list must be effectively
399 spin_lock(&conn
->lock
);
400 if (likely(!atomic_dec_and_test(&call
->usage
))) {
401 spin_unlock(&conn
->lock
);
406 if (conn
->channels
[ntohl(call
->chan_ix
)] == call
)
407 conn
->channels
[ntohl(call
->chan_ix
)] = NULL
;
409 spin_unlock(&conn
->lock
);
411 wake_up(&conn
->chanwait
);
413 rxrpc_put_connection(conn
);
415 /* clear the timers and dequeue from krxiod */
416 del_timer_sync(&call
->acks_timeout
);
417 del_timer_sync(&call
->rcv_timeout
);
418 del_timer_sync(&call
->ackr_dfr_timo
);
420 rxrpc_krxiod_dequeue_call(call
);
422 /* clean up the contents of the struct */
423 if (call
->snd_nextmsg
)
424 rxrpc_put_message(call
->snd_nextmsg
);
427 rxrpc_put_message(call
->snd_ping
);
429 while (!list_empty(&call
->acks_pendq
)) {
430 msg
= list_entry(call
->acks_pendq
.next
,
431 struct rxrpc_message
, link
);
432 list_del(&msg
->link
);
433 rxrpc_put_message(msg
);
436 while (!list_empty(&call
->rcv_receiveq
)) {
437 msg
= list_entry(call
->rcv_receiveq
.next
,
438 struct rxrpc_message
, link
);
439 list_del(&msg
->link
);
440 rxrpc_put_message(msg
);
443 while (!list_empty(&call
->app_readyq
)) {
444 msg
= list_entry(call
->app_readyq
.next
,
445 struct rxrpc_message
, link
);
446 list_del(&msg
->link
);
447 rxrpc_put_message(msg
);
450 while (!list_empty(&call
->app_unreadyq
)) {
451 msg
= list_entry(call
->app_unreadyq
.next
,
452 struct rxrpc_message
, link
);
453 list_del(&msg
->link
);
454 rxrpc_put_message(msg
);
457 module_put(call
->owner
);
459 down_write(&rxrpc_calls_sem
);
460 list_del(&call
->call_link
);
461 up_write(&rxrpc_calls_sem
);
463 __RXACCT(atomic_dec(&rxrpc_call_count
));
464 free_page((unsigned long) call
);
466 _leave(" [destroyed]");
467 } /* end rxrpc_put_call() */
469 /*****************************************************************************/
471 * actually generate a normal ACK
473 static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call
*call
,
476 struct rxrpc_message
*msg
;
481 /* ACKs default to DELAY */
482 if (!call
->ackr
.reason
)
483 call
->ackr
.reason
= RXRPC_ACK_DELAY
;
485 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
486 jiffies
- call
->cjif
,
487 ntohs(call
->ackr
.maxSkew
),
488 ntohl(call
->ackr
.firstPacket
),
489 ntohl(call
->ackr
.previousPacket
),
490 ntohl(call
->ackr
.serial
),
491 rxrpc_acks
[call
->ackr
.reason
],
494 aux
[0] = htonl(call
->conn
->peer
->if_mtu
); /* interface MTU */
495 aux
[1] = htonl(1444); /* max MTU */
496 aux
[2] = htonl(16); /* rwind */
497 aux
[3] = htonl(4); /* max packets */
499 diov
[0].iov_len
= sizeof(struct rxrpc_ackpacket
);
500 diov
[0].iov_base
= &call
->ackr
;
501 diov
[1].iov_len
= call
->ackr_pend_cnt
+ 3;
502 diov
[1].iov_base
= call
->ackr_array
;
503 diov
[2].iov_len
= sizeof(aux
);
504 diov
[2].iov_base
= &aux
;
506 /* build and send the message */
507 ret
= rxrpc_conn_newmsg(call
->conn
,call
, RXRPC_PACKET_TYPE_ACK
,
508 3, diov
, GFP_KERNEL
, &msg
);
513 msg
->hdr
.seq
= htonl(seq
);
514 msg
->hdr
.flags
|= RXRPC_SLOW_START_OK
;
516 ret
= rxrpc_conn_sendmsg(call
->conn
, msg
);
517 rxrpc_put_message(msg
);
520 call
->pkt_snd_count
++;
522 /* count how many actual ACKs there were at the front */
523 for (delta
= 0; delta
< call
->ackr_pend_cnt
; delta
++)
524 if (call
->ackr_array
[delta
] != RXRPC_ACK_TYPE_ACK
)
527 call
->ackr_pend_cnt
-= delta
; /* all ACK'd to this point */
529 /* crank the ACK window around */
531 /* un-ACK'd window */
533 else if (delta
< RXRPC_CALL_ACK_WINDOW_SIZE
) {
534 /* partially ACK'd window
535 * - shuffle down to avoid losing out-of-sequence packets
537 call
->ackr_win_bot
+= delta
;
538 call
->ackr_win_top
+= delta
;
540 memmove(&call
->ackr_array
[0],
541 &call
->ackr_array
[delta
],
542 call
->ackr_pend_cnt
);
544 memset(&call
->ackr_array
[call
->ackr_pend_cnt
],
546 sizeof(call
->ackr_array
) - call
->ackr_pend_cnt
);
549 /* fully ACK'd window
550 * - just clear the whole thing
552 memset(&call
->ackr_array
,
554 sizeof(call
->ackr_array
));
558 memset(&call
->ackr
, 0, sizeof(call
->ackr
));
561 if (!call
->app_call_state
)
562 printk("___ STATE 0 ___\n");
564 } /* end __rxrpc_call_gen_normal_ACK() */
566 /*****************************************************************************/
568 * note the reception of a packet in the call's ACK records and generate an
569 * appropriate ACK packet if necessary
570 * - returns 0 if packet should be processed, 1 if packet should be ignored
571 * and -ve on an error
573 static int rxrpc_call_generate_ACK(struct rxrpc_call
*call
,
574 struct rxrpc_header
*hdr
,
575 struct rxrpc_ackpacket
*ack
)
577 struct rxrpc_message
*msg
;
581 u8 special_ACK
, do_ACK
, force
;
583 _enter("%p,%p { seq=%d tp=%d fl=%02x }",
584 call
, hdr
, ntohl(hdr
->seq
), hdr
->type
, hdr
->flags
);
586 seq
= ntohl(hdr
->seq
);
587 offset
= seq
- call
->ackr_win_bot
;
588 do_ACK
= RXRPC_ACK_DELAY
;
592 if (call
->ackr_high_seq
< seq
)
593 call
->ackr_high_seq
= seq
;
595 /* deal with generation of obvious special ACKs first */
596 if (ack
&& ack
->reason
== RXRPC_ACK_PING
) {
597 special_ACK
= RXRPC_ACK_PING_RESPONSE
;
602 if (seq
< call
->ackr_win_bot
) {
603 special_ACK
= RXRPC_ACK_DUPLICATE
;
608 if (seq
>= call
->ackr_win_top
) {
609 special_ACK
= RXRPC_ACK_EXCEEDS_WINDOW
;
614 if (call
->ackr_array
[offset
] != RXRPC_ACK_TYPE_NACK
) {
615 special_ACK
= RXRPC_ACK_DUPLICATE
;
620 /* okay... it's a normal data packet inside the ACK window */
621 call
->ackr_array
[offset
] = RXRPC_ACK_TYPE_ACK
;
623 if (offset
< call
->ackr_pend_cnt
) {
625 else if (offset
> call
->ackr_pend_cnt
) {
626 do_ACK
= RXRPC_ACK_OUT_OF_SEQUENCE
;
627 call
->ackr_pend_cnt
= offset
;
631 if (hdr
->flags
& RXRPC_REQUEST_ACK
) {
632 do_ACK
= RXRPC_ACK_REQUESTED
;
635 /* generate an ACK on the final packet of a reply just received */
636 if (hdr
->flags
& RXRPC_LAST_PACKET
) {
637 if (call
->conn
->out_clientflag
)
640 else if (!(hdr
->flags
& RXRPC_MORE_PACKETS
)) {
641 do_ACK
= RXRPC_ACK_REQUESTED
;
644 /* re-ACK packets previously received out-of-order */
645 for (offset
++; offset
< RXRPC_CALL_ACK_WINDOW_SIZE
; offset
++)
646 if (call
->ackr_array
[offset
] != RXRPC_ACK_TYPE_ACK
)
649 call
->ackr_pend_cnt
= offset
;
651 /* generate an ACK if we fill up the window */
652 if (call
->ackr_pend_cnt
>= RXRPC_CALL_ACK_WINDOW_SIZE
)
656 _debug("%05lu ACKs pend=%u norm=%s special=%s%s",
657 jiffies
- call
->cjif
,
660 rxrpc_acks
[special_ACK
],
661 force
? " immediate" :
662 do_ACK
== RXRPC_ACK_REQUESTED
? " merge-req" :
663 hdr
->flags
& RXRPC_LAST_PACKET
? " finalise" :
667 /* send any pending normal ACKs if need be */
668 if (call
->ackr_pend_cnt
> 0) {
669 /* fill out the appropriate form */
670 call
->ackr
.bufferSpace
= htons(RXRPC_CALL_ACK_WINDOW_SIZE
);
671 call
->ackr
.maxSkew
= htons(min(call
->ackr_high_seq
- seq
,
673 call
->ackr
.firstPacket
= htonl(call
->ackr_win_bot
);
674 call
->ackr
.previousPacket
= call
->ackr_prev_seq
;
675 call
->ackr
.serial
= hdr
->serial
;
676 call
->ackr
.nAcks
= call
->ackr_pend_cnt
;
678 if (do_ACK
== RXRPC_ACK_REQUESTED
)
679 call
->ackr
.reason
= do_ACK
;
681 /* generate the ACK immediately if necessary */
682 if (special_ACK
|| force
) {
683 err
= __rxrpc_call_gen_normal_ACK(
684 call
, do_ACK
== RXRPC_ACK_DELAY
? 0 : seq
);
692 if (call
->ackr
.reason
== RXRPC_ACK_REQUESTED
)
693 call
->ackr_dfr_seq
= seq
;
695 /* start the ACK timer if not running if there are any pending deferred
697 if (call
->ackr_pend_cnt
> 0 &&
698 call
->ackr
.reason
!= RXRPC_ACK_REQUESTED
&&
699 !timer_pending(&call
->ackr_dfr_timo
)
703 timo
= rxrpc_call_dfr_ack_timeout
+ jiffies
;
705 _debug("START ACKR TIMER for cj=%lu", timo
- call
->cjif
);
707 spin_lock(&call
->lock
);
708 mod_timer(&call
->ackr_dfr_timo
, timo
);
709 spin_unlock(&call
->lock
);
711 else if ((call
->ackr_pend_cnt
== 0 ||
712 call
->ackr
.reason
== RXRPC_ACK_REQUESTED
) &&
713 timer_pending(&call
->ackr_dfr_timo
)
715 /* stop timer if no pending ACKs */
716 _debug("CLEAR ACKR TIMER");
717 del_timer_sync(&call
->ackr_dfr_timo
);
720 /* send a special ACK if one is required */
722 struct rxrpc_ackpacket ack
;
724 uint8_t acks
[1] = { RXRPC_ACK_TYPE_ACK
};
726 /* fill out the appropriate form */
727 ack
.bufferSpace
= htons(RXRPC_CALL_ACK_WINDOW_SIZE
);
728 ack
.maxSkew
= htons(min(call
->ackr_high_seq
- seq
,
730 ack
.firstPacket
= htonl(call
->ackr_win_bot
);
731 ack
.previousPacket
= call
->ackr_prev_seq
;
732 ack
.serial
= hdr
->serial
;
733 ack
.reason
= special_ACK
;
736 _proto("Rx Sending s-ACK"
737 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
739 ntohl(ack
.firstPacket
),
740 ntohl(ack
.previousPacket
),
742 rxrpc_acks
[ack
.reason
],
745 diov
[0].iov_len
= sizeof(struct rxrpc_ackpacket
);
746 diov
[0].iov_base
= &ack
;
747 diov
[1].iov_len
= sizeof(acks
);
748 diov
[1].iov_base
= acks
;
750 /* build and send the message */
751 err
= rxrpc_conn_newmsg(call
->conn
,call
, RXRPC_PACKET_TYPE_ACK
,
752 hdr
->seq
? 2 : 1, diov
,
761 msg
->hdr
.seq
= htonl(seq
);
762 msg
->hdr
.flags
|= RXRPC_SLOW_START_OK
;
764 err
= rxrpc_conn_sendmsg(call
->conn
, msg
);
765 rxrpc_put_message(msg
);
770 call
->pkt_snd_count
++;
775 call
->ackr_prev_seq
= hdr
->seq
;
777 _leave(" = %d", ret
);
779 } /* end rxrpc_call_generate_ACK() */
781 /*****************************************************************************/
783 * handle work to be done on a call
784 * - includes packet reception and timeout processing
786 void rxrpc_call_do_stuff(struct rxrpc_call
*call
)
788 _enter("%p{flags=%lx}", call
, call
->flags
);
790 /* handle packet reception */
791 if (call
->flags
& RXRPC_CALL_RCV_PKT
) {
792 _debug("- receive packet");
793 call
->flags
&= ~RXRPC_CALL_RCV_PKT
;
794 rxrpc_call_receive_packet(call
);
797 /* handle overdue ACKs */
798 if (call
->flags
& RXRPC_CALL_ACKS_TIMO
) {
799 _debug("- overdue ACK timeout");
800 call
->flags
&= ~RXRPC_CALL_ACKS_TIMO
;
801 rxrpc_call_resend(call
, call
->snd_seq_count
);
804 /* handle lack of reception */
805 if (call
->flags
& RXRPC_CALL_RCV_TIMO
) {
806 _debug("- reception timeout");
807 call
->flags
&= ~RXRPC_CALL_RCV_TIMO
;
808 rxrpc_call_abort(call
, -EIO
);
811 /* handle deferred ACKs */
812 if (call
->flags
& RXRPC_CALL_ACKR_TIMO
||
813 (call
->ackr
.nAcks
> 0 && call
->ackr
.reason
== RXRPC_ACK_REQUESTED
)
815 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u",
816 jiffies
- call
->cjif
,
817 rxrpc_acks
[call
->ackr
.reason
],
820 call
->flags
&= ~RXRPC_CALL_ACKR_TIMO
;
822 if (call
->ackr
.nAcks
> 0 &&
823 call
->app_call_state
!= RXRPC_CSTATE_ERROR
) {
825 __rxrpc_call_gen_normal_ACK(call
, call
->ackr_dfr_seq
);
826 call
->ackr_dfr_seq
= 0;
832 } /* end rxrpc_call_do_stuff() */
834 /*****************************************************************************/
836 * send an abort message at call or connection level
837 * - must be called with call->lock held
838 * - the supplied error code is sent as the packet data
840 static int __rxrpc_call_abort(struct rxrpc_call
*call
, int errno
)
842 struct rxrpc_connection
*conn
= call
->conn
;
843 struct rxrpc_message
*msg
;
848 _enter("%p{%08x},%p{%d},%d",
849 conn
, ntohl(conn
->conn_id
), call
, ntohl(call
->call_id
), errno
);
851 /* if this call is already aborted, then just wake up any waiters */
852 if (call
->app_call_state
== RXRPC_CSTATE_ERROR
) {
853 spin_unlock(&call
->lock
);
854 call
->app_error_func(call
);
859 rxrpc_get_call(call
);
861 /* change the state _with_ the lock still held */
862 call
->app_call_state
= RXRPC_CSTATE_ERROR
;
863 call
->app_err_state
= RXRPC_ESTATE_LOCAL_ABORT
;
864 call
->app_errno
= errno
;
865 call
->app_mark
= RXRPC_APP_MARK_EOF
;
866 call
->app_read_buf
= NULL
;
867 call
->app_async_read
= 0;
871 /* ask the app to translate the error code */
872 call
->app_aemap_func(call
);
874 spin_unlock(&call
->lock
);
876 /* flush any outstanding ACKs */
877 del_timer_sync(&call
->acks_timeout
);
878 del_timer_sync(&call
->rcv_timeout
);
879 del_timer_sync(&call
->ackr_dfr_timo
);
881 if (rxrpc_call_is_ack_pending(call
))
882 __rxrpc_call_gen_normal_ACK(call
, 0);
884 /* send the abort packet only if we actually traded some other
887 if (call
->pkt_snd_count
|| call
->pkt_rcv_count
) {
888 /* actually send the abort */
889 _proto("Rx Sending Call ABORT { data=%d }",
890 call
->app_abort_code
);
892 _error
= htonl(call
->app_abort_code
);
894 diov
[0].iov_len
= sizeof(_error
);
895 diov
[0].iov_base
= &_error
;
897 ret
= rxrpc_conn_newmsg(conn
, call
, RXRPC_PACKET_TYPE_ABORT
,
898 1, diov
, GFP_KERNEL
, &msg
);
900 ret
= rxrpc_conn_sendmsg(conn
, msg
);
901 rxrpc_put_message(msg
);
905 /* tell the app layer to let go */
906 call
->app_error_func(call
);
908 rxrpc_put_call(call
);
910 _leave(" = %d", ret
);
912 } /* end __rxrpc_call_abort() */
914 /*****************************************************************************/
916 * send an abort message at call or connection level
917 * - the supplied error code is sent as the packet data
919 int rxrpc_call_abort(struct rxrpc_call
*call
, int error
)
921 spin_lock(&call
->lock
);
923 return __rxrpc_call_abort(call
, error
);
925 } /* end rxrpc_call_abort() */
927 /*****************************************************************************/
929 * process packets waiting for this call
931 static void rxrpc_call_receive_packet(struct rxrpc_call
*call
)
933 struct rxrpc_message
*msg
;
934 struct list_head
*_p
;
938 rxrpc_get_call(call
); /* must not go away too soon if aborted by
941 while (!list_empty(&call
->rcv_receiveq
)) {
942 /* try to get next packet */
944 spin_lock(&call
->lock
);
945 if (!list_empty(&call
->rcv_receiveq
)) {
946 _p
= call
->rcv_receiveq
.next
;
949 spin_unlock(&call
->lock
);
954 msg
= list_entry(_p
, struct rxrpc_message
, link
);
956 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)",
957 jiffies
- call
->cjif
,
958 rxrpc_pkts
[msg
->hdr
.type
],
959 ntohl(msg
->hdr
.serial
),
961 msg
->hdr
.flags
& RXRPC_JUMBO_PACKET
? 'j' : '-',
962 msg
->hdr
.flags
& RXRPC_MORE_PACKETS
? 'm' : '-',
963 msg
->hdr
.flags
& RXRPC_LAST_PACKET
? 'l' : '-',
964 msg
->hdr
.flags
& RXRPC_REQUEST_ACK
? 'r' : '-',
965 msg
->hdr
.flags
& RXRPC_CLIENT_INITIATED
? 'C' : 'S'
968 switch (msg
->hdr
.type
) {
969 /* deal with data packets */
970 case RXRPC_PACKET_TYPE_DATA
:
971 /* ACK the packet if necessary */
972 switch (rxrpc_call_generate_ACK(call
, &msg
->hdr
,
974 case 0: /* useful packet */
975 rxrpc_call_receive_data_packet(call
, msg
);
977 case 1: /* duplicate or out-of-window packet */
980 rxrpc_put_message(msg
);
985 /* deal with ACK packets */
986 case RXRPC_PACKET_TYPE_ACK
:
987 rxrpc_call_receive_ack_packet(call
, msg
);
990 /* deal with abort packets */
991 case RXRPC_PACKET_TYPE_ABORT
: {
994 dp
= skb_header_pointer(msg
->pkt
, msg
->offset
,
995 sizeof(_dbuf
), &_dbuf
);
997 printk("Rx Received short ABORT packet\n");
999 _proto("Rx Received Call ABORT { data=%d }",
1000 (dp
? ntohl(*dp
) : 0));
1002 spin_lock(&call
->lock
);
1003 call
->app_call_state
= RXRPC_CSTATE_ERROR
;
1004 call
->app_err_state
= RXRPC_ESTATE_PEER_ABORT
;
1005 call
->app_abort_code
= (dp
? ntohl(*dp
) : 0);
1006 call
->app_errno
= -ECONNABORTED
;
1007 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1008 call
->app_read_buf
= NULL
;
1009 call
->app_async_read
= 0;
1011 /* ask the app to translate the error code */
1012 call
->app_aemap_func(call
);
1014 spin_unlock(&call
->lock
);
1015 call
->app_error_func(call
);
1019 /* deal with other packet types */
1020 _proto("Rx Unsupported packet type %u (#%u)",
1021 msg
->hdr
.type
, msg
->seq
);
1025 rxrpc_put_message(msg
);
1029 rxrpc_put_call(call
);
1031 } /* end rxrpc_call_receive_packet() */
1033 /*****************************************************************************/
1035 * process next data packet
1036 * - as the next data packet arrives:
1037 * - it is queued on app_readyq _if_ it is the next one expected
1039 * - it is queued on app_unreadyq _if_ it is not the next one expected
1040 * - if a packet placed on app_readyq completely fills a hole leading up to
1041 * the first packet on app_unreadyq, then packets now in sequence are
1042 * tranferred to app_readyq
1043 * - the application layer can only see packets on app_readyq
1044 * (app_ready_qty bytes)
1045 * - the application layer is prodded every time a new packet arrives
1047 static void rxrpc_call_receive_data_packet(struct rxrpc_call
*call
,
1048 struct rxrpc_message
*msg
)
1050 const struct rxrpc_operation
*optbl
, *op
;
1051 struct rxrpc_message
*pmsg
;
1052 struct list_head
*_p
;
1053 int ret
, lo
, hi
, rmtimo
;
1056 _enter("%p{%u},%p{%u}", call
, ntohl(call
->call_id
), msg
, msg
->seq
);
1058 rxrpc_get_message(msg
);
1060 /* add to the unready queue if we'd have to create a hole in the ready
1061 * queue otherwise */
1062 if (msg
->seq
!= call
->app_ready_seq
+ 1) {
1063 _debug("Call add packet %d to unreadyq", msg
->seq
);
1065 /* insert in seq order */
1066 list_for_each(_p
, &call
->app_unreadyq
) {
1067 pmsg
= list_entry(_p
, struct rxrpc_message
, link
);
1068 if (pmsg
->seq
> msg
->seq
)
1072 list_add_tail(&msg
->link
, _p
);
1074 _leave(" [unreadyq]");
1078 /* next in sequence - simply append into the call's ready queue */
1079 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)",
1080 msg
->seq
, msg
->dsize
, call
->app_ready_qty
);
1082 spin_lock(&call
->lock
);
1083 call
->app_ready_seq
= msg
->seq
;
1084 call
->app_ready_qty
+= msg
->dsize
;
1085 list_add_tail(&msg
->link
, &call
->app_readyq
);
1087 /* move unready packets to the readyq if we got rid of a hole */
1088 while (!list_empty(&call
->app_unreadyq
)) {
1089 pmsg
= list_entry(call
->app_unreadyq
.next
,
1090 struct rxrpc_message
, link
);
1092 if (pmsg
->seq
!= call
->app_ready_seq
+ 1)
1095 /* next in sequence - just move list-to-list */
1096 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)",
1097 pmsg
->seq
, pmsg
->dsize
, call
->app_ready_qty
);
1099 call
->app_ready_seq
= pmsg
->seq
;
1100 call
->app_ready_qty
+= pmsg
->dsize
;
1101 list_move_tail(&pmsg
->link
, &call
->app_readyq
);
1104 /* see if we've got the last packet yet */
1105 if (!list_empty(&call
->app_readyq
)) {
1106 pmsg
= list_entry(call
->app_readyq
.prev
,
1107 struct rxrpc_message
, link
);
1108 if (pmsg
->hdr
.flags
& RXRPC_LAST_PACKET
) {
1109 call
->app_last_rcv
= 1;
1110 _debug("Last packet on readyq");
1114 switch (call
->app_call_state
) {
1115 /* do nothing if call already aborted */
1116 case RXRPC_CSTATE_ERROR
:
1117 spin_unlock(&call
->lock
);
1121 /* extract the operation ID from an incoming call if that's not
1123 case RXRPC_CSTATE_SRVR_RCV_OPID
:
1124 spin_unlock(&call
->lock
);
1126 /* handle as yet insufficient data for the operation ID */
1127 if (call
->app_ready_qty
< 4) {
1128 if (call
->app_last_rcv
)
1129 /* trouble - last packet seen */
1130 rxrpc_call_abort(call
, -EINVAL
);
1136 /* pull the operation ID out of the buffer */
1137 ret
= rxrpc_call_read_data(call
, &opid
, sizeof(opid
), 0);
1139 printk("Unexpected error from read-data: %d\n", ret
);
1140 if (call
->app_call_state
!= RXRPC_CSTATE_ERROR
)
1141 rxrpc_call_abort(call
, ret
);
1145 call
->app_opcode
= ntohl(opid
);
1147 /* locate the operation in the available ops table */
1148 optbl
= call
->conn
->service
->ops_begin
;
1150 hi
= call
->conn
->service
->ops_end
- optbl
;
1153 int mid
= (hi
+ lo
) / 2;
1155 if (call
->app_opcode
== op
->id
)
1157 if (call
->app_opcode
> op
->id
)
1164 kproto("Rx Client requested operation %d from %s service",
1165 call
->app_opcode
, call
->conn
->service
->name
);
1166 rxrpc_call_abort(call
, -EINVAL
);
1171 _proto("Rx Client requested operation %s from %s service",
1172 op
->name
, call
->conn
->service
->name
);
1174 /* we're now waiting for the argument block (unless the call
1176 spin_lock(&call
->lock
);
1177 if (call
->app_call_state
== RXRPC_CSTATE_SRVR_RCV_OPID
||
1178 call
->app_call_state
== RXRPC_CSTATE_SRVR_SND_REPLY
) {
1179 if (!call
->app_last_rcv
)
1180 call
->app_call_state
=
1181 RXRPC_CSTATE_SRVR_RCV_ARGS
;
1182 else if (call
->app_ready_qty
> 0)
1183 call
->app_call_state
=
1184 RXRPC_CSTATE_SRVR_GOT_ARGS
;
1186 call
->app_call_state
=
1187 RXRPC_CSTATE_SRVR_SND_REPLY
;
1188 call
->app_mark
= op
->asize
;
1189 call
->app_user
= op
->user
;
1191 spin_unlock(&call
->lock
);
1196 case RXRPC_CSTATE_SRVR_RCV_ARGS
:
1197 /* change state if just received last packet of arg block */
1198 if (call
->app_last_rcv
)
1199 call
->app_call_state
= RXRPC_CSTATE_SRVR_GOT_ARGS
;
1200 spin_unlock(&call
->lock
);
1205 case RXRPC_CSTATE_CLNT_RCV_REPLY
:
1206 /* change state if just received last packet of reply block */
1208 if (call
->app_last_rcv
) {
1209 call
->app_call_state
= RXRPC_CSTATE_CLNT_GOT_REPLY
;
1212 spin_unlock(&call
->lock
);
1215 del_timer_sync(&call
->acks_timeout
);
1216 del_timer_sync(&call
->rcv_timeout
);
1217 del_timer_sync(&call
->ackr_dfr_timo
);
1224 /* deal with data reception in an unexpected state */
1225 printk("Unexpected state [[[ %u ]]]\n", call
->app_call_state
);
1226 __rxrpc_call_abort(call
, -EBADMSG
);
1231 if (call
->app_call_state
== RXRPC_CSTATE_CLNT_RCV_REPLY
&&
1235 /* otherwise just invoke the data function whenever we can satisfy its desire for more
1238 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s",
1239 call
->app_call_state
, call
->app_ready_qty
, call
->app_mark
,
1240 call
->app_last_rcv
? " last-rcvd" : "");
1242 spin_lock(&call
->lock
);
1244 ret
= __rxrpc_call_read_data(call
);
1247 spin_unlock(&call
->lock
);
1248 call
->app_attn_func(call
);
1251 spin_unlock(&call
->lock
);
1254 spin_unlock(&call
->lock
);
1257 __rxrpc_call_abort(call
, ret
);
1265 } /* end rxrpc_call_receive_data_packet() */
1267 /*****************************************************************************/
1269 * received an ACK packet
1271 static void rxrpc_call_receive_ack_packet(struct rxrpc_call
*call
,
1272 struct rxrpc_message
*msg
)
1274 struct rxrpc_ackpacket _ack
, *ap
;
1275 rxrpc_serial_net_t serial
;
1279 _enter("%p{%u},%p{%u}", call
, ntohl(call
->call_id
), msg
, msg
->seq
);
1281 /* extract the basic ACK record */
1282 ap
= skb_header_pointer(msg
->pkt
, msg
->offset
, sizeof(_ack
), &_ack
);
1284 printk("Rx Received short ACK packet\n");
1287 msg
->offset
+= sizeof(_ack
);
1289 serial
= ap
->serial
;
1290 seq
= ntohl(ap
->firstPacket
);
1292 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }",
1293 ntohl(msg
->hdr
.serial
),
1294 ntohs(ap
->bufferSpace
),
1297 ntohl(ap
->previousPacket
),
1299 rxrpc_acks
[ap
->reason
],
1303 /* check the other side isn't ACK'ing a sequence number I haven't sent
1305 if (ap
->nAcks
> 0 &&
1306 (seq
> call
->snd_seq_count
||
1307 seq
+ ap
->nAcks
- 1 > call
->snd_seq_count
)) {
1308 printk("Received ACK (#%u-#%u) for unsent packet\n",
1309 seq
, seq
+ ap
->nAcks
- 1);
1310 rxrpc_call_abort(call
, -EINVAL
);
1315 /* deal with RTT calculation */
1317 struct rxrpc_message
*rttmsg
;
1319 /* find the prompting packet */
1320 spin_lock(&call
->lock
);
1321 if (call
->snd_ping
&& call
->snd_ping
->hdr
.serial
== serial
) {
1322 /* it was a ping packet */
1323 rttmsg
= call
->snd_ping
;
1324 call
->snd_ping
= NULL
;
1325 spin_unlock(&call
->lock
);
1328 rttmsg
->rttdone
= 1;
1329 rxrpc_peer_calculate_rtt(call
->conn
->peer
,
1331 rxrpc_put_message(rttmsg
);
1335 struct list_head
*_p
;
1337 /* it ought to be a data packet - look in the pending
1339 list_for_each(_p
, &call
->acks_pendq
) {
1340 rttmsg
= list_entry(_p
, struct rxrpc_message
,
1342 if (rttmsg
->hdr
.serial
== serial
) {
1343 if (rttmsg
->rttdone
)
1344 /* never do RTT twice without
1348 rttmsg
->rttdone
= 1;
1349 rxrpc_peer_calculate_rtt(
1350 call
->conn
->peer
, rttmsg
, msg
);
1354 spin_unlock(&call
->lock
);
1358 switch (ap
->reason
) {
1359 /* deal with negative/positive acknowledgement of data
1361 case RXRPC_ACK_REQUESTED
:
1362 case RXRPC_ACK_DELAY
:
1363 case RXRPC_ACK_IDLE
:
1364 rxrpc_call_definitively_ACK(call
, seq
- 1);
1366 case RXRPC_ACK_DUPLICATE
:
1367 case RXRPC_ACK_OUT_OF_SEQUENCE
:
1368 case RXRPC_ACK_EXCEEDS_WINDOW
:
1369 call
->snd_resend_cnt
= 0;
1370 ret
= rxrpc_call_record_ACK(call
, msg
, seq
, ap
->nAcks
);
1372 rxrpc_call_abort(call
, ret
);
1375 /* respond to ping packets immediately */
1376 case RXRPC_ACK_PING
:
1377 rxrpc_call_generate_ACK(call
, &msg
->hdr
, ap
);
1380 /* only record RTT on ping response packets */
1381 case RXRPC_ACK_PING_RESPONSE
:
1382 if (call
->snd_ping
) {
1383 struct rxrpc_message
*rttmsg
;
1385 /* only do RTT stuff if the response matches the
1388 spin_lock(&call
->lock
);
1389 if (call
->snd_ping
&&
1390 call
->snd_ping
->hdr
.serial
== ap
->serial
) {
1391 rttmsg
= call
->snd_ping
;
1392 call
->snd_ping
= NULL
;
1394 spin_unlock(&call
->lock
);
1397 rttmsg
->rttdone
= 1;
1398 rxrpc_peer_calculate_rtt(call
->conn
->peer
,
1400 rxrpc_put_message(rttmsg
);
1406 printk("Unsupported ACK reason %u\n", ap
->reason
);
1411 } /* end rxrpc_call_receive_ack_packet() */
1413 /*****************************************************************************/
1415 * record definitive ACKs for all messages up to and including the one with the
1418 static void rxrpc_call_definitively_ACK(struct rxrpc_call
*call
,
1419 rxrpc_seq_t highest
)
1421 struct rxrpc_message
*msg
;
1424 _enter("%p{ads=%u},%u", call
, call
->acks_dftv_seq
, highest
);
1426 while (call
->acks_dftv_seq
< highest
) {
1427 call
->acks_dftv_seq
++;
1429 _proto("Definitive ACK on packet #%u", call
->acks_dftv_seq
);
1431 /* discard those at front of queue until message with highest
1433 spin_lock(&call
->lock
);
1435 if (!list_empty(&call
->acks_pendq
)) {
1436 msg
= list_entry(call
->acks_pendq
.next
,
1437 struct rxrpc_message
, link
);
1438 list_del_init(&msg
->link
); /* dequeue */
1439 if (msg
->state
== RXRPC_MSG_SENT
)
1440 call
->acks_pend_cnt
--;
1442 spin_unlock(&call
->lock
);
1444 /* insanity check */
1446 panic("%s(): acks_pendq unexpectedly empty\n",
1449 if (msg
->seq
!= call
->acks_dftv_seq
)
1450 panic("%s(): Packet #%u expected at front of acks_pendq"
1452 __FUNCTION__
, call
->acks_dftv_seq
, msg
->seq
);
1454 /* discard the message */
1455 msg
->state
= RXRPC_MSG_DONE
;
1456 rxrpc_put_message(msg
);
1459 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */
1461 spin_lock(&call
->lock
);
1462 if (call
->acks_dftv_seq
== call
->snd_seq_count
) {
1463 if (call
->app_call_state
!= RXRPC_CSTATE_COMPLETE
) {
1464 call
->app_call_state
= RXRPC_CSTATE_COMPLETE
;
1469 spin_unlock(&call
->lock
);
1472 del_timer_sync(&call
->acks_timeout
);
1473 del_timer_sync(&call
->rcv_timeout
);
1474 del_timer_sync(&call
->ackr_dfr_timo
);
1475 call
->app_attn_func(call
);
1479 } /* end rxrpc_call_definitively_ACK() */
1481 /*****************************************************************************/
1483 * record the specified amount of ACKs/NAKs
1485 static int rxrpc_call_record_ACK(struct rxrpc_call
*call
,
1486 struct rxrpc_message
*msg
,
1490 struct rxrpc_message
*dmsg
;
1491 struct list_head
*_p
;
1492 rxrpc_seq_t highest
;
1495 char resend
, now_complete
;
1498 _enter("%p{apc=%u ads=%u},%p,%u,%Zu",
1499 call
, call
->acks_pend_cnt
, call
->acks_dftv_seq
,
1502 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order
1504 if (seq
<= call
->acks_dftv_seq
) {
1505 unsigned delta
= call
->acks_dftv_seq
- seq
;
1507 if (count
<= delta
) {
1508 _leave(" = 0 [all definitively ACK'd]");
1514 msg
->offset
+= delta
;
1517 highest
= seq
+ count
- 1;
1520 /* extract up to 16 ACK slots at a time */
1521 chunk
= min(count
, sizeof(acks
));
1524 memset(acks
, 2, sizeof(acks
));
1526 if (skb_copy_bits(msg
->pkt
, msg
->offset
, &acks
, chunk
) < 0) {
1527 printk("Rx Received short ACK packet\n");
1528 _leave(" = -EINVAL");
1531 msg
->offset
+= chunk
;
1533 /* check that the ACK set is valid */
1534 for (ix
= 0; ix
< chunk
; ix
++) {
1536 case RXRPC_ACK_TYPE_ACK
:
1538 case RXRPC_ACK_TYPE_NACK
:
1542 printk("Rx Received unsupported ACK state"
1544 _leave(" = -EINVAL");
1549 _proto("Rx ACK of packets #%u-#%u "
1550 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)",
1551 seq
, (unsigned) (seq
+ chunk
- 1),
1552 _acktype
[acks
[0x0]],
1553 _acktype
[acks
[0x1]],
1554 _acktype
[acks
[0x2]],
1555 _acktype
[acks
[0x3]],
1556 _acktype
[acks
[0x4]],
1557 _acktype
[acks
[0x5]],
1558 _acktype
[acks
[0x6]],
1559 _acktype
[acks
[0x7]],
1560 _acktype
[acks
[0x8]],
1561 _acktype
[acks
[0x9]],
1562 _acktype
[acks
[0xA]],
1563 _acktype
[acks
[0xB]],
1564 _acktype
[acks
[0xC]],
1565 _acktype
[acks
[0xD]],
1566 _acktype
[acks
[0xE]],
1567 _acktype
[acks
[0xF]],
1571 /* mark the packets in the ACK queue as being provisionally
1574 spin_lock(&call
->lock
);
1576 /* find the first packet ACK'd/NAK'd here */
1577 list_for_each(_p
, &call
->acks_pendq
) {
1578 dmsg
= list_entry(_p
, struct rxrpc_message
, link
);
1579 if (dmsg
->seq
== seq
)
1581 _debug("- %u: skipping #%u", ix
, dmsg
->seq
);
1587 _debug("- %u: processing #%u (%c) apc=%u",
1588 ix
, dmsg
->seq
, _acktype
[acks
[ix
]],
1589 call
->acks_pend_cnt
);
1591 if (acks
[ix
] == RXRPC_ACK_TYPE_ACK
) {
1592 if (dmsg
->state
== RXRPC_MSG_SENT
)
1593 call
->acks_pend_cnt
--;
1594 dmsg
->state
= RXRPC_MSG_ACKED
;
1597 if (dmsg
->state
== RXRPC_MSG_ACKED
)
1598 call
->acks_pend_cnt
++;
1599 dmsg
->state
= RXRPC_MSG_SENT
;
1604 _p
= dmsg
->link
.next
;
1605 dmsg
= list_entry(_p
, struct rxrpc_message
, link
);
1606 } while(ix
< chunk
&&
1607 _p
!= &call
->acks_pendq
&&
1613 spin_unlock(&call
->lock
);
1617 rxrpc_call_resend(call
, highest
);
1619 /* if all packets are provisionally ACK'd, then wake up anyone who's
1620 * waiting for that */
1622 spin_lock(&call
->lock
);
1623 if (call
->acks_pend_cnt
== 0) {
1624 if (call
->app_call_state
== RXRPC_CSTATE_SRVR_RCV_FINAL_ACK
) {
1625 call
->app_call_state
= RXRPC_CSTATE_COMPLETE
;
1630 spin_unlock(&call
->lock
);
1633 _debug("- wake up waiters");
1634 del_timer_sync(&call
->acks_timeout
);
1635 del_timer_sync(&call
->rcv_timeout
);
1636 del_timer_sync(&call
->ackr_dfr_timo
);
1637 call
->app_attn_func(call
);
1640 _leave(" = 0 (apc=%u)", call
->acks_pend_cnt
);
1644 panic("%s(): acks_pendq in bad state (packet #%u absent)\n",
1647 } /* end rxrpc_call_record_ACK() */
1649 /*****************************************************************************/
1651 * transfer data from the ready packet queue to the asynchronous read buffer
1652 * - since this func is the only one going to look at packets queued on
1653 * app_readyq, we don't need a lock to modify or access them, only to modify
1654 * the queue pointers
1655 * - called with call->lock held
1656 * - the buffer must be in kernel space
1658 * 0 if buffer filled
1659 * -EAGAIN if buffer not filled and more data to come
1660 * -EBADMSG if last packet received and insufficient data left
1661 * -ECONNABORTED if the call has in an error state
1663 static int __rxrpc_call_read_data(struct rxrpc_call
*call
)
1665 struct rxrpc_message
*msg
;
1669 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}",
1671 call
->app_async_read
, call
->app_read_buf
,
1672 call
->app_ready_qty
, call
->app_mark
);
1674 /* check the state */
1675 switch (call
->app_call_state
) {
1676 case RXRPC_CSTATE_SRVR_RCV_ARGS
:
1677 case RXRPC_CSTATE_CLNT_RCV_REPLY
:
1678 if (call
->app_last_rcv
) {
1679 printk("%s(%p,%p,%Zd):"
1680 " Inconsistent call state (%s, last pkt)",
1682 call
, call
->app_read_buf
, call
->app_mark
,
1683 rxrpc_call_states
[call
->app_call_state
]);
1688 case RXRPC_CSTATE_SRVR_RCV_OPID
:
1689 case RXRPC_CSTATE_SRVR_GOT_ARGS
:
1690 case RXRPC_CSTATE_CLNT_GOT_REPLY
:
1693 case RXRPC_CSTATE_SRVR_SND_REPLY
:
1694 if (!call
->app_last_rcv
) {
1695 printk("%s(%p,%p,%Zd):"
1696 " Inconsistent call state (%s, not last pkt)",
1698 call
, call
->app_read_buf
, call
->app_mark
,
1699 rxrpc_call_states
[call
->app_call_state
]);
1702 _debug("Trying to read data from call in SND_REPLY state");
1705 case RXRPC_CSTATE_ERROR
:
1706 _leave(" = -ECONNABORTED");
1707 return -ECONNABORTED
;
1710 printk("reading in unexpected state [[[ %u ]]]\n",
1711 call
->app_call_state
);
1715 /* handle the case of not having an async buffer */
1716 if (!call
->app_async_read
) {
1717 if (call
->app_mark
== RXRPC_APP_MARK_EOF
) {
1718 ret
= call
->app_last_rcv
? 0 : -EAGAIN
;
1721 if (call
->app_mark
>= call
->app_ready_qty
) {
1722 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1726 ret
= call
->app_last_rcv
? -EBADMSG
: -EAGAIN
;
1730 _leave(" = %d [no buf]", ret
);
1734 while (!list_empty(&call
->app_readyq
) && call
->app_mark
> 0) {
1735 msg
= list_entry(call
->app_readyq
.next
,
1736 struct rxrpc_message
, link
);
1738 /* drag as much data as we need out of this packet */
1739 qty
= min(call
->app_mark
, msg
->dsize
);
1741 _debug("reading %Zu from skb=%p off=%lu",
1742 qty
, msg
->pkt
, msg
->offset
);
1744 if (call
->app_read_buf
)
1745 if (skb_copy_bits(msg
->pkt
, msg
->offset
,
1746 call
->app_read_buf
, qty
) < 0)
1747 panic("%s: Failed to copy data from packet:"
1750 call
, call
->app_read_buf
, qty
);
1752 /* if that packet is now empty, discard it */
1753 call
->app_ready_qty
-= qty
;
1756 if (msg
->dsize
== 0) {
1757 list_del_init(&msg
->link
);
1758 rxrpc_put_message(msg
);
1764 call
->app_mark
-= qty
;
1765 if (call
->app_read_buf
)
1766 call
->app_read_buf
+= qty
;
1769 if (call
->app_mark
== 0) {
1770 call
->app_async_read
= 0;
1771 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1772 call
->app_read_buf
= NULL
;
1774 /* adjust the state if used up all packets */
1775 if (list_empty(&call
->app_readyq
) && call
->app_last_rcv
) {
1776 switch (call
->app_call_state
) {
1777 case RXRPC_CSTATE_SRVR_RCV_OPID
:
1778 call
->app_call_state
= RXRPC_CSTATE_SRVR_SND_REPLY
;
1779 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1781 del_timer_sync(&call
->rcv_timeout
);
1783 case RXRPC_CSTATE_SRVR_GOT_ARGS
:
1784 call
->app_call_state
= RXRPC_CSTATE_SRVR_SND_REPLY
;
1786 del_timer_sync(&call
->rcv_timeout
);
1789 call
->app_call_state
= RXRPC_CSTATE_COMPLETE
;
1791 del_timer_sync(&call
->acks_timeout
);
1792 del_timer_sync(&call
->ackr_dfr_timo
);
1793 del_timer_sync(&call
->rcv_timeout
);
1802 if (call
->app_last_rcv
) {
1803 _debug("Insufficient data (%Zu/%Zu)",
1804 call
->app_ready_qty
, call
->app_mark
);
1805 call
->app_async_read
= 0;
1806 call
->app_mark
= RXRPC_APP_MARK_EOF
;
1807 call
->app_read_buf
= NULL
;
1809 _leave(" = -EBADMSG");
1813 _leave(" = -EAGAIN");
1815 } /* end __rxrpc_call_read_data() */
1817 /*****************************************************************************/
1819 * attempt to read the specified amount of data from the call's ready queue
1820 * into the buffer provided
1821 * - since this func is the only one going to look at packets queued on
1822 * app_readyq, we don't need a lock to modify or access them, only to modify
1823 * the queue pointers
1824 * - if the buffer pointer is NULL, then data is merely drained, not copied
1825 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is
1826 * enough data or an error will be generated
1827 * - note that the caller must have added the calling task to the call's wait
1829 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this
1830 * function doesn't read all available data
1832 int rxrpc_call_read_data(struct rxrpc_call
*call
,
1833 void *buffer
, size_t size
, int flags
)
1837 _enter("%p{arq=%Zu},%p,%Zd,%x",
1838 call
, call
->app_ready_qty
, buffer
, size
, flags
);
1840 spin_lock(&call
->lock
);
1842 if (unlikely(!!call
->app_read_buf
)) {
1843 spin_unlock(&call
->lock
);
1844 _leave(" = -EBUSY");
1848 call
->app_mark
= size
;
1849 call
->app_read_buf
= buffer
;
1850 call
->app_async_read
= 1;
1851 call
->app_read_count
++;
1853 /* read as much data as possible */
1854 ret
= __rxrpc_call_read_data(call
);
1857 if (flags
& RXRPC_CALL_READ_ALL
&&
1858 (!call
->app_last_rcv
|| call
->app_ready_qty
> 0)) {
1859 _leave(" = -EBADMSG");
1860 __rxrpc_call_abort(call
, -EBADMSG
);
1864 spin_unlock(&call
->lock
);
1865 call
->app_attn_func(call
);
1870 spin_unlock(&call
->lock
);
1871 _leave(" = %d [aborted]", ret
);
1875 __rxrpc_call_abort(call
, ret
);
1876 _leave(" = %d", ret
);
1880 spin_unlock(&call
->lock
);
1882 if (!(flags
& RXRPC_CALL_READ_BLOCK
)) {
1883 _leave(" = -EAGAIN");
1887 /* wait for the data to arrive */
1888 _debug("blocking for data arrival");
1891 set_current_state(TASK_INTERRUPTIBLE
);
1892 if (!call
->app_async_read
|| signal_pending(current
))
1896 set_current_state(TASK_RUNNING
);
1898 if (signal_pending(current
)) {
1899 _leave(" = -EINTR");
1903 if (call
->app_call_state
== RXRPC_CSTATE_ERROR
) {
1904 _leave(" = -ECONNABORTED");
1905 return -ECONNABORTED
;
1912 } /* end rxrpc_call_read_data() */
1914 /*****************************************************************************/
1916 * write data to a call
1917 * - the data may not be sent immediately if it doesn't fill a buffer
1918 * - if we can't queue all the data for buffering now, siov[] will have been
1919 * adjusted to take account of what has been sent
1921 int rxrpc_call_write_data(struct rxrpc_call
*call
,
1929 struct rxrpc_message
*msg
;
1931 size_t space
, size
, chunk
, tmp
;
1935 _enter("%p,%Zu,%p,%02x,%x,%d,%p",
1936 call
, sioc
, siov
, rxhdr_flags
, alloc_flags
, dup_data
,
1943 /* can't send more if we've sent last packet from this end */
1944 switch (call
->app_call_state
) {
1945 case RXRPC_CSTATE_SRVR_SND_REPLY
:
1946 case RXRPC_CSTATE_CLNT_SND_ARGS
:
1948 case RXRPC_CSTATE_ERROR
:
1949 ret
= call
->app_errno
;
1954 /* calculate how much data we've been given */
1956 for (; sioc
> 0; sptr
++, sioc
--) {
1960 if (!sptr
->iov_base
)
1963 size
+= sptr
->iov_len
;
1966 _debug("- size=%Zu mtu=%Zu", size
, call
->conn
->mtu_size
);
1969 /* make sure there's a message under construction */
1970 if (!call
->snd_nextmsg
) {
1971 /* no - allocate a message with no data yet attached */
1972 ret
= rxrpc_conn_newmsg(call
->conn
, call
,
1973 RXRPC_PACKET_TYPE_DATA
,
1974 0, NULL
, alloc_flags
,
1975 &call
->snd_nextmsg
);
1978 _debug("- allocated new message [ds=%Zu]",
1979 call
->snd_nextmsg
->dsize
);
1982 msg
= call
->snd_nextmsg
;
1983 msg
->hdr
.flags
|= rxhdr_flags
;
1985 /* deal with zero-length terminal packet */
1987 if (rxhdr_flags
& RXRPC_LAST_PACKET
) {
1988 ret
= rxrpc_call_flush(call
);
1995 /* work out how much space current packet has available */
1996 space
= call
->conn
->mtu_size
- msg
->dsize
;
1997 chunk
= min(space
, size
);
1999 _debug("- [before] space=%Zu chunk=%Zu", space
, chunk
);
2001 while (!siov
->iov_len
)
2004 /* if we are going to have to duplicate the data then coalesce
2007 /* don't allocate more that 1 page at a time */
2008 if (chunk
> PAGE_SIZE
)
2011 /* allocate a data buffer and attach to the message */
2012 buf
= kmalloc(chunk
, alloc_flags
);
2013 if (unlikely(!buf
)) {
2015 sizeof(struct rxrpc_header
)) {
2016 /* discard an empty msg and wind back
2017 * the seq counter */
2018 rxrpc_put_message(msg
);
2019 call
->snd_nextmsg
= NULL
;
2020 call
->snd_seq_count
--;
2027 tmp
= msg
->dcount
++;
2028 set_bit(tmp
, &msg
->dfree
);
2029 msg
->data
[tmp
].iov_base
= buf
;
2030 msg
->data
[tmp
].iov_len
= chunk
;
2031 msg
->dsize
+= chunk
;
2032 *size_sent
+= chunk
;
2035 /* load the buffer with data */
2037 tmp
= min(chunk
, siov
->iov_len
);
2038 memcpy(buf
, siov
->iov_base
, tmp
);
2040 siov
->iov_base
+= tmp
;
2041 siov
->iov_len
-= tmp
;
2048 /* we want to attach the supplied buffers directly */
2050 msg
->dcount
< RXRPC_MSG_MAX_IOCS
) {
2051 tmp
= msg
->dcount
++;
2052 msg
->data
[tmp
].iov_base
= siov
->iov_base
;
2053 msg
->data
[tmp
].iov_len
= siov
->iov_len
;
2054 msg
->dsize
+= siov
->iov_len
;
2055 *size_sent
+= siov
->iov_len
;
2056 size
-= siov
->iov_len
;
2057 chunk
-= siov
->iov_len
;
2062 _debug("- [loaded] chunk=%Zu size=%Zu", chunk
, size
);
2064 /* dispatch the message when full, final or requesting ACK */
2065 if (msg
->dsize
>= call
->conn
->mtu_size
|| rxhdr_flags
) {
2066 ret
= rxrpc_call_flush(call
);
2075 _leave(" = %d (%Zd queued, %Zd rem)", ret
, *size_sent
, size
);
2078 } /* end rxrpc_call_write_data() */
2080 /*****************************************************************************/
2082 * flush outstanding packets to the network
2084 static int rxrpc_call_flush(struct rxrpc_call
*call
)
2086 struct rxrpc_message
*msg
;
2091 rxrpc_get_call(call
);
2093 /* if there's a packet under construction, then dispatch it now */
2094 if (call
->snd_nextmsg
) {
2095 msg
= call
->snd_nextmsg
;
2096 call
->snd_nextmsg
= NULL
;
2098 if (msg
->hdr
.flags
& RXRPC_LAST_PACKET
) {
2099 msg
->hdr
.flags
&= ~RXRPC_MORE_PACKETS
;
2100 if (call
->app_call_state
!= RXRPC_CSTATE_CLNT_SND_ARGS
)
2101 msg
->hdr
.flags
|= RXRPC_REQUEST_ACK
;
2104 msg
->hdr
.flags
|= RXRPC_MORE_PACKETS
;
2107 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }",
2108 msg
->dsize
, msg
->dcount
, msg
->dfree
);
2110 /* queue and adjust call state */
2111 spin_lock(&call
->lock
);
2112 list_add_tail(&msg
->link
, &call
->acks_pendq
);
2114 /* decide what to do depending on current state and if this is
2115 * the last packet */
2117 switch (call
->app_call_state
) {
2118 case RXRPC_CSTATE_SRVR_SND_REPLY
:
2119 if (msg
->hdr
.flags
& RXRPC_LAST_PACKET
) {
2120 call
->app_call_state
=
2121 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK
;
2126 case RXRPC_CSTATE_CLNT_SND_ARGS
:
2127 if (msg
->hdr
.flags
& RXRPC_LAST_PACKET
) {
2128 call
->app_call_state
=
2129 RXRPC_CSTATE_CLNT_RCV_REPLY
;
2134 case RXRPC_CSTATE_ERROR
:
2135 ret
= call
->app_errno
;
2137 spin_unlock(&call
->lock
);
2141 call
->acks_pend_cnt
++;
2143 mod_timer(&call
->acks_timeout
,
2144 __rxrpc_rtt_based_timeout(call
,
2145 rxrpc_call_acks_timeout
));
2147 spin_unlock(&call
->lock
);
2149 ret
= rxrpc_conn_sendmsg(call
->conn
, msg
);
2151 call
->pkt_snd_count
++;
2155 rxrpc_put_call(call
);
2157 _leave(" = %d", ret
);
2160 } /* end rxrpc_call_flush() */
2162 /*****************************************************************************/
2164 * resend NAK'd or unacknowledged packets up to the highest one specified
2166 static void rxrpc_call_resend(struct rxrpc_call
*call
, rxrpc_seq_t highest
)
2168 struct rxrpc_message
*msg
;
2169 struct list_head
*_p
;
2170 rxrpc_seq_t seq
= 0;
2172 _enter("%p,%u", call
, highest
);
2174 _proto("Rx Resend required");
2176 /* handle too many resends */
2177 if (call
->snd_resend_cnt
>= rxrpc_call_max_resend
) {
2178 _debug("Aborting due to too many resends (rcv=%d)",
2179 call
->pkt_rcv_count
);
2180 rxrpc_call_abort(call
,
2181 call
->pkt_rcv_count
> 0 ? -EIO
: -ETIMEDOUT
);
2186 spin_lock(&call
->lock
);
2187 call
->snd_resend_cnt
++;
2189 /* determine which the next packet we might need to ACK is */
2190 if (seq
<= call
->acks_dftv_seq
)
2191 seq
= call
->acks_dftv_seq
;
2197 /* look for the packet in the pending-ACK queue */
2198 list_for_each(_p
, &call
->acks_pendq
) {
2199 msg
= list_entry(_p
, struct rxrpc_message
, link
);
2200 if (msg
->seq
== seq
)
2205 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n",
2206 __FUNCTION__
, call
, highest
,
2207 call
->acks_dftv_seq
, call
->snd_seq_count
, seq
);
2210 if (msg
->state
!= RXRPC_MSG_SENT
)
2211 continue; /* only un-ACK'd packets */
2213 rxrpc_get_message(msg
);
2214 spin_unlock(&call
->lock
);
2216 /* send each message again (and ignore any errors we might
2218 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }",
2219 msg
->dsize
, msg
->dcount
, msg
->dfree
);
2221 if (rxrpc_conn_sendmsg(call
->conn
, msg
) == 0)
2222 call
->pkt_snd_count
++;
2224 rxrpc_put_message(msg
);
2226 spin_lock(&call
->lock
);
2229 /* reset the timeout */
2230 mod_timer(&call
->acks_timeout
,
2231 __rxrpc_rtt_based_timeout(call
, rxrpc_call_acks_timeout
));
2233 spin_unlock(&call
->lock
);
2236 } /* end rxrpc_call_resend() */
2238 /*****************************************************************************/
2240 * handle an ICMP error being applied to a call
2242 void rxrpc_call_handle_error(struct rxrpc_call
*call
, int local
, int errno
)
2244 _enter("%p{%u},%d", call
, ntohl(call
->call_id
), errno
);
2246 /* if this call is already aborted, then just wake up any waiters */
2247 if (call
->app_call_state
== RXRPC_CSTATE_ERROR
) {
2248 call
->app_error_func(call
);
2251 /* tell the app layer what happened */
2252 spin_lock(&call
->lock
);
2253 call
->app_call_state
= RXRPC_CSTATE_ERROR
;
2256 call
->app_err_state
= RXRPC_ESTATE_LOCAL_ERROR
;
2258 call
->app_err_state
= RXRPC_ESTATE_REMOTE_ERROR
;
2259 call
->app_errno
= errno
;
2260 call
->app_mark
= RXRPC_APP_MARK_EOF
;
2261 call
->app_read_buf
= NULL
;
2262 call
->app_async_read
= 0;
2265 call
->app_aemap_func(call
);
2267 del_timer_sync(&call
->acks_timeout
);
2268 del_timer_sync(&call
->rcv_timeout
);
2269 del_timer_sync(&call
->ackr_dfr_timo
);
2271 spin_unlock(&call
->lock
);
2273 call
->app_error_func(call
);
2277 } /* end rxrpc_call_handle_error() */