1 /* RxRPC individual remote procedure call handling
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
14 #include <linux/slab.h>
15 #include <linux/module.h>
16 #include <linux/circ_buf.h>
17 #include <linux/spinlock_types.h>
19 #include <net/af_rxrpc.h>
20 #include "ar-internal.h"
22 const char *const rxrpc_call_states
[NR__RXRPC_CALL_STATES
] = {
23 [RXRPC_CALL_UNINITIALISED
] = "Uninit ",
24 [RXRPC_CALL_CLIENT_AWAIT_CONN
] = "ClWtConn",
25 [RXRPC_CALL_CLIENT_SEND_REQUEST
] = "ClSndReq",
26 [RXRPC_CALL_CLIENT_AWAIT_REPLY
] = "ClAwtRpl",
27 [RXRPC_CALL_CLIENT_RECV_REPLY
] = "ClRcvRpl",
28 [RXRPC_CALL_SERVER_PREALLOC
] = "SvPrealc",
29 [RXRPC_CALL_SERVER_SECURING
] = "SvSecure",
30 [RXRPC_CALL_SERVER_ACCEPTING
] = "SvAccept",
31 [RXRPC_CALL_SERVER_RECV_REQUEST
] = "SvRcvReq",
32 [RXRPC_CALL_SERVER_ACK_REQUEST
] = "SvAckReq",
33 [RXRPC_CALL_SERVER_SEND_REPLY
] = "SvSndRpl",
34 [RXRPC_CALL_SERVER_AWAIT_ACK
] = "SvAwtACK",
35 [RXRPC_CALL_COMPLETE
] = "Complete",
38 const char *const rxrpc_call_completions
[NR__RXRPC_CALL_COMPLETIONS
] = {
39 [RXRPC_CALL_SUCCEEDED
] = "Complete",
40 [RXRPC_CALL_REMOTELY_ABORTED
] = "RmtAbort",
41 [RXRPC_CALL_LOCALLY_ABORTED
] = "LocAbort",
42 [RXRPC_CALL_LOCAL_ERROR
] = "LocError",
43 [RXRPC_CALL_NETWORK_ERROR
] = "NetError",
46 struct kmem_cache
*rxrpc_call_jar
;
48 static void rxrpc_call_timer_expired(unsigned long _call
)
50 struct rxrpc_call
*call
= (struct rxrpc_call
*)_call
;
52 _enter("%d", call
->debug_id
);
54 if (call
->state
< RXRPC_CALL_COMPLETE
)
55 rxrpc_set_timer(call
, rxrpc_timer_expired
, ktime_get_real());
58 static struct lock_class_key rxrpc_call_user_mutex_lock_class_key
;
61 * find an extant server call
62 * - called in process context with IRQs enabled
64 struct rxrpc_call
*rxrpc_find_call_by_user_ID(struct rxrpc_sock
*rx
,
65 unsigned long user_call_ID
)
67 struct rxrpc_call
*call
;
70 _enter("%p,%lx", rx
, user_call_ID
);
72 read_lock(&rx
->call_lock
);
74 p
= rx
->calls
.rb_node
;
76 call
= rb_entry(p
, struct rxrpc_call
, sock_node
);
78 if (user_call_ID
< call
->user_call_ID
)
80 else if (user_call_ID
> call
->user_call_ID
)
83 goto found_extant_call
;
86 read_unlock(&rx
->call_lock
);
91 rxrpc_get_call(call
, rxrpc_call_got
);
92 read_unlock(&rx
->call_lock
);
93 _leave(" = %p [%d]", call
, atomic_read(&call
->usage
));
100 struct rxrpc_call
*rxrpc_alloc_call(struct rxrpc_sock
*rx
, gfp_t gfp
)
102 struct rxrpc_call
*call
;
104 call
= kmem_cache_zalloc(rxrpc_call_jar
, gfp
);
108 call
->rxtx_buffer
= kcalloc(RXRPC_RXTX_BUFF_SIZE
,
109 sizeof(struct sk_buff
*),
111 if (!call
->rxtx_buffer
)
114 call
->rxtx_annotations
= kcalloc(RXRPC_RXTX_BUFF_SIZE
, sizeof(u8
), gfp
);
115 if (!call
->rxtx_annotations
)
118 mutex_init(&call
->user_mutex
);
120 /* Prevent lockdep reporting a deadlock false positive between the afs
121 * filesystem and sys_sendmsg() via the mmap sem.
123 if (rx
->sk
.sk_kern_sock
)
124 lockdep_set_class(&call
->user_mutex
,
125 &rxrpc_call_user_mutex_lock_class_key
);
127 setup_timer(&call
->timer
, rxrpc_call_timer_expired
,
128 (unsigned long)call
);
129 INIT_WORK(&call
->processor
, &rxrpc_process_call
);
130 INIT_LIST_HEAD(&call
->link
);
131 INIT_LIST_HEAD(&call
->chan_wait_link
);
132 INIT_LIST_HEAD(&call
->accept_link
);
133 INIT_LIST_HEAD(&call
->recvmsg_link
);
134 INIT_LIST_HEAD(&call
->sock_link
);
135 init_waitqueue_head(&call
->waitq
);
136 spin_lock_init(&call
->lock
);
137 rwlock_init(&call
->state_lock
);
138 atomic_set(&call
->usage
, 1);
139 call
->debug_id
= atomic_inc_return(&rxrpc_debug_id
);
140 call
->tx_total_len
= -1;
142 memset(&call
->sock_node
, 0xed, sizeof(call
->sock_node
));
144 /* Leave space in the ring to handle a maxed-out jumbo packet */
145 call
->rx_winsize
= rxrpc_rx_window_size
;
146 call
->tx_winsize
= 16;
147 call
->rx_expect_next
= 1;
150 call
->cong_ssthresh
= RXRPC_RXTX_BUFF_SIZE
- 1;
154 kfree(call
->rxtx_buffer
);
156 kmem_cache_free(rxrpc_call_jar
, call
);
161 * Allocate a new client call.
163 static struct rxrpc_call
*rxrpc_alloc_client_call(struct rxrpc_sock
*rx
,
164 struct sockaddr_rxrpc
*srx
,
167 struct rxrpc_call
*call
;
172 call
= rxrpc_alloc_call(rx
, gfp
);
174 return ERR_PTR(-ENOMEM
);
175 call
->state
= RXRPC_CALL_CLIENT_AWAIT_CONN
;
176 call
->service_id
= srx
->srx_service
;
177 call
->tx_phase
= true;
178 now
= ktime_get_real();
179 call
->acks_latest_ts
= now
;
180 call
->cong_tstamp
= now
;
182 _leave(" = %p", call
);
187 * Initiate the call ack/resend/expiry timer.
189 static void rxrpc_start_call_timer(struct rxrpc_call
*call
)
191 ktime_t now
= ktime_get_real(), expire_at
;
193 expire_at
= ktime_add_ms(now
, rxrpc_max_call_lifetime
);
194 call
->expire_at
= expire_at
;
195 call
->ack_at
= expire_at
;
196 call
->ping_at
= expire_at
;
197 call
->resend_at
= expire_at
;
198 call
->timer
.expires
= jiffies
+ LONG_MAX
/ 2;
199 rxrpc_set_timer(call
, rxrpc_timer_begin
, now
);
203 * Set up a call for the given parameters.
204 * - Called with the socket lock held, which it must release.
205 * - If it returns a call, the call's lock will need releasing by the caller.
207 struct rxrpc_call
*rxrpc_new_client_call(struct rxrpc_sock
*rx
,
208 struct rxrpc_conn_parameters
*cp
,
209 struct sockaddr_rxrpc
*srx
,
210 unsigned long user_call_ID
,
213 __releases(&rx
->sk
.sk_lock
.slock
)
215 struct rxrpc_call
*call
, *xcall
;
216 struct rxrpc_net
*rxnet
= rxrpc_net(sock_net(&rx
->sk
));
217 struct rb_node
*parent
, **pp
;
218 const void *here
= __builtin_return_address(0);
221 _enter("%p,%lx", rx
, user_call_ID
);
223 call
= rxrpc_alloc_client_call(rx
, srx
, gfp
);
225 release_sock(&rx
->sk
);
226 _leave(" = %ld", PTR_ERR(call
));
230 call
->tx_total_len
= tx_total_len
;
231 trace_rxrpc_call(call
, rxrpc_call_new_client
, atomic_read(&call
->usage
),
232 here
, (const void *)user_call_ID
);
234 /* We need to protect a partially set up call against the user as we
235 * will be acting outside the socket lock.
237 mutex_lock(&call
->user_mutex
);
239 /* Publish the call, even though it is incompletely set up as yet */
240 write_lock(&rx
->call_lock
);
242 pp
= &rx
->calls
.rb_node
;
246 xcall
= rb_entry(parent
, struct rxrpc_call
, sock_node
);
248 if (user_call_ID
< xcall
->user_call_ID
)
249 pp
= &(*pp
)->rb_left
;
250 else if (user_call_ID
> xcall
->user_call_ID
)
251 pp
= &(*pp
)->rb_right
;
253 goto error_dup_user_ID
;
256 rcu_assign_pointer(call
->socket
, rx
);
257 call
->user_call_ID
= user_call_ID
;
258 __set_bit(RXRPC_CALL_HAS_USERID
, &call
->flags
);
259 rxrpc_get_call(call
, rxrpc_call_got_userid
);
260 rb_link_node(&call
->sock_node
, parent
, pp
);
261 rb_insert_color(&call
->sock_node
, &rx
->calls
);
262 list_add(&call
->sock_link
, &rx
->sock_calls
);
264 write_unlock(&rx
->call_lock
);
266 write_lock(&rxnet
->call_lock
);
267 list_add_tail(&call
->link
, &rxnet
->calls
);
268 write_unlock(&rxnet
->call_lock
);
270 /* From this point on, the call is protected by its own lock. */
271 release_sock(&rx
->sk
);
273 /* Set up or get a connection record and set the protocol parameters,
274 * including channel number and call ID.
276 ret
= rxrpc_connect_call(call
, cp
, srx
, gfp
);
280 trace_rxrpc_call(call
, rxrpc_call_connected
, atomic_read(&call
->usage
),
283 rxrpc_start_call_timer(call
);
285 _net("CALL new %d on CONN %d", call
->debug_id
, call
->conn
->debug_id
);
287 _leave(" = %p [new]", call
);
290 /* We unexpectedly found the user ID in the list after taking
291 * the call_lock. This shouldn't happen unless the user races
292 * with itself and tries to add the same user ID twice at the
293 * same time in different threads.
296 write_unlock(&rx
->call_lock
);
297 release_sock(&rx
->sk
);
301 __rxrpc_set_call_completion(call
, RXRPC_CALL_LOCAL_ERROR
,
303 trace_rxrpc_call(call
, rxrpc_call_error
, atomic_read(&call
->usage
),
305 rxrpc_release_call(rx
, call
);
306 mutex_unlock(&call
->user_mutex
);
307 rxrpc_put_call(call
, rxrpc_call_put
);
308 _leave(" = %d", ret
);
313 * Retry a call to a new address. It is expected that the Tx queue of the call
314 * will contain data previously packaged for an old call.
316 int rxrpc_retry_client_call(struct rxrpc_sock
*rx
,
317 struct rxrpc_call
*call
,
318 struct rxrpc_conn_parameters
*cp
,
319 struct sockaddr_rxrpc
*srx
,
322 const void *here
= __builtin_return_address(0);
325 /* Set up or get a connection record and set the protocol parameters,
326 * including channel number and call ID.
328 ret
= rxrpc_connect_call(call
, cp
, srx
, gfp
);
332 trace_rxrpc_call(call
, rxrpc_call_connected
, atomic_read(&call
->usage
),
335 rxrpc_start_call_timer(call
);
337 _net("CALL new %d on CONN %d", call
->debug_id
, call
->conn
->debug_id
);
339 if (!test_and_set_bit(RXRPC_CALL_EV_RESEND
, &call
->events
))
340 rxrpc_queue_call(call
);
346 rxrpc_set_call_completion(call
, RXRPC_CALL_LOCAL_ERROR
,
348 trace_rxrpc_call(call
, rxrpc_call_error
, atomic_read(&call
->usage
),
350 _leave(" = %d", ret
);
355 * Set up an incoming call. call->conn points to the connection.
356 * This is called in BH context and isn't allowed to fail.
358 void rxrpc_incoming_call(struct rxrpc_sock
*rx
,
359 struct rxrpc_call
*call
,
362 struct rxrpc_connection
*conn
= call
->conn
;
363 struct rxrpc_skb_priv
*sp
= rxrpc_skb(skb
);
366 _enter(",%d", call
->conn
->debug_id
);
368 rcu_assign_pointer(call
->socket
, rx
);
369 call
->call_id
= sp
->hdr
.callNumber
;
370 call
->service_id
= sp
->hdr
.serviceId
;
371 call
->cid
= sp
->hdr
.cid
;
372 call
->state
= RXRPC_CALL_SERVER_ACCEPTING
;
373 if (sp
->hdr
.securityIndex
> 0)
374 call
->state
= RXRPC_CALL_SERVER_SECURING
;
375 call
->cong_tstamp
= skb
->tstamp
;
377 /* Set the channel for this call. We don't get channel_lock as we're
378 * only defending against the data_ready handler (which we're called
379 * from) and the RESPONSE packet parser (which is only really
380 * interested in call_counter and can cope with a disagreement with the
383 chan
= sp
->hdr
.cid
& RXRPC_CHANNELMASK
;
384 conn
->channels
[chan
].call_counter
= call
->call_id
;
385 conn
->channels
[chan
].call_id
= call
->call_id
;
386 rcu_assign_pointer(conn
->channels
[chan
].call
, call
);
388 spin_lock(&conn
->params
.peer
->lock
);
389 hlist_add_head(&call
->error_link
, &conn
->params
.peer
->error_targets
);
390 spin_unlock(&conn
->params
.peer
->lock
);
392 _net("CALL incoming %d on CONN %d", call
->debug_id
, call
->conn
->debug_id
);
394 rxrpc_start_call_timer(call
);
399 * Queue a call's work processor, getting a ref to pass to the work queue.
401 bool rxrpc_queue_call(struct rxrpc_call
*call
)
403 const void *here
= __builtin_return_address(0);
404 int n
= __atomic_add_unless(&call
->usage
, 1, 0);
407 if (rxrpc_queue_work(&call
->processor
))
408 trace_rxrpc_call(call
, rxrpc_call_queued
, n
+ 1, here
, NULL
);
410 rxrpc_put_call(call
, rxrpc_call_put_noqueue
);
415 * Queue a call's work processor, passing the callers ref to the work queue.
417 bool __rxrpc_queue_call(struct rxrpc_call
*call
)
419 const void *here
= __builtin_return_address(0);
420 int n
= atomic_read(&call
->usage
);
422 if (rxrpc_queue_work(&call
->processor
))
423 trace_rxrpc_call(call
, rxrpc_call_queued_ref
, n
, here
, NULL
);
425 rxrpc_put_call(call
, rxrpc_call_put_noqueue
);
430 * Note the re-emergence of a call.
432 void rxrpc_see_call(struct rxrpc_call
*call
)
434 const void *here
= __builtin_return_address(0);
436 int n
= atomic_read(&call
->usage
);
438 trace_rxrpc_call(call
, rxrpc_call_seen
, n
, here
, NULL
);
443 * Note the addition of a ref on a call.
445 void rxrpc_get_call(struct rxrpc_call
*call
, enum rxrpc_call_trace op
)
447 const void *here
= __builtin_return_address(0);
448 int n
= atomic_inc_return(&call
->usage
);
450 trace_rxrpc_call(call
, op
, n
, here
, NULL
);
454 * Detach a call from its owning socket.
456 void rxrpc_release_call(struct rxrpc_sock
*rx
, struct rxrpc_call
*call
)
458 const void *here
= __builtin_return_address(0);
459 struct rxrpc_connection
*conn
= call
->conn
;
463 _enter("{%d,%d}", call
->debug_id
, atomic_read(&call
->usage
));
465 trace_rxrpc_call(call
, rxrpc_call_release
, atomic_read(&call
->usage
),
466 here
, (const void *)call
->flags
);
468 ASSERTCMP(call
->state
, ==, RXRPC_CALL_COMPLETE
);
470 spin_lock_bh(&call
->lock
);
471 if (test_and_set_bit(RXRPC_CALL_RELEASED
, &call
->flags
))
473 spin_unlock_bh(&call
->lock
);
475 del_timer_sync(&call
->timer
);
477 /* Make sure we don't get any more notifications */
478 write_lock_bh(&rx
->recvmsg_lock
);
480 if (!list_empty(&call
->recvmsg_link
)) {
481 _debug("unlinking once-pending call %p { e=%lx f=%lx }",
482 call
, call
->events
, call
->flags
);
483 list_del(&call
->recvmsg_link
);
487 /* list_empty() must return false in rxrpc_notify_socket() */
488 call
->recvmsg_link
.next
= NULL
;
489 call
->recvmsg_link
.prev
= NULL
;
491 write_unlock_bh(&rx
->recvmsg_lock
);
493 rxrpc_put_call(call
, rxrpc_call_put
);
495 write_lock(&rx
->call_lock
);
497 if (test_and_clear_bit(RXRPC_CALL_HAS_USERID
, &call
->flags
)) {
498 rb_erase(&call
->sock_node
, &rx
->calls
);
499 memset(&call
->sock_node
, 0xdd, sizeof(call
->sock_node
));
500 rxrpc_put_call(call
, rxrpc_call_put_userid
);
503 list_del(&call
->sock_link
);
504 write_unlock(&rx
->call_lock
);
506 _debug("RELEASE CALL %p (%d CONN %p)", call
, call
->debug_id
, conn
);
509 rxrpc_disconnect_call(call
);
511 for (i
= 0; i
< RXRPC_RXTX_BUFF_SIZE
; i
++) {
512 rxrpc_free_skb(call
->rxtx_buffer
[i
],
513 (call
->tx_phase
? rxrpc_skb_tx_cleaned
:
514 rxrpc_skb_rx_cleaned
));
515 call
->rxtx_buffer
[i
] = NULL
;
522 * Prepare a kernel service call for retry.
524 int rxrpc_prepare_call_for_retry(struct rxrpc_sock
*rx
, struct rxrpc_call
*call
)
526 const void *here
= __builtin_return_address(0);
530 _enter("{%d,%d}", call
->debug_id
, atomic_read(&call
->usage
));
532 trace_rxrpc_call(call
, rxrpc_call_release
, atomic_read(&call
->usage
),
533 here
, (const void *)call
->flags
);
535 ASSERTCMP(call
->state
, ==, RXRPC_CALL_COMPLETE
);
536 ASSERTCMP(call
->completion
, !=, RXRPC_CALL_REMOTELY_ABORTED
);
537 ASSERTCMP(call
->completion
, !=, RXRPC_CALL_LOCALLY_ABORTED
);
538 ASSERT(list_empty(&call
->recvmsg_link
));
540 del_timer_sync(&call
->timer
);
542 _debug("RELEASE CALL %p (%d CONN %p)", call
, call
->debug_id
, call
->conn
);
545 rxrpc_disconnect_call(call
);
547 if (rxrpc_is_service_call(call
) ||
549 call
->tx_hard_ack
!= 0 ||
550 call
->rx_hard_ack
!= 0 ||
554 call
->state
= RXRPC_CALL_UNINITIALISED
;
555 call
->completion
= RXRPC_CALL_SUCCEEDED
;
559 call
->cong_extra
= 0;
560 call
->cong_ssthresh
= 0;
562 call
->cong_dup_acks
= 0;
563 call
->cong_cumul_acks
= 0;
564 call
->acks_lowest_nak
= 0;
566 for (i
= 0; i
< RXRPC_RXTX_BUFF_SIZE
; i
++) {
567 last
|= call
->rxtx_annotations
[i
];
568 call
->rxtx_annotations
[i
] &= RXRPC_TX_ANNO_LAST
;
569 call
->rxtx_annotations
[i
] |= RXRPC_TX_ANNO_RETRANS
;
577 * release all the calls associated with a socket
579 void rxrpc_release_calls_on_socket(struct rxrpc_sock
*rx
)
581 struct rxrpc_call
*call
;
585 while (!list_empty(&rx
->to_be_accepted
)) {
586 call
= list_entry(rx
->to_be_accepted
.next
,
587 struct rxrpc_call
, accept_link
);
588 list_del(&call
->accept_link
);
589 rxrpc_abort_call("SKR", call
, 0, RX_CALL_DEAD
, -ECONNRESET
);
590 rxrpc_put_call(call
, rxrpc_call_put
);
593 while (!list_empty(&rx
->sock_calls
)) {
594 call
= list_entry(rx
->sock_calls
.next
,
595 struct rxrpc_call
, sock_link
);
596 rxrpc_get_call(call
, rxrpc_call_got
);
597 rxrpc_abort_call("SKT", call
, 0, RX_CALL_DEAD
, -ECONNRESET
);
598 rxrpc_send_abort_packet(call
);
599 rxrpc_release_call(rx
, call
);
600 rxrpc_put_call(call
, rxrpc_call_put
);
609 void rxrpc_put_call(struct rxrpc_call
*call
, enum rxrpc_call_trace op
)
611 struct rxrpc_net
*rxnet
;
612 const void *here
= __builtin_return_address(0);
615 ASSERT(call
!= NULL
);
617 n
= atomic_dec_return(&call
->usage
);
618 trace_rxrpc_call(call
, op
, n
, here
, NULL
);
621 _debug("call %d dead", call
->debug_id
);
622 ASSERTCMP(call
->state
, ==, RXRPC_CALL_COMPLETE
);
624 if (!list_empty(&call
->link
)) {
625 rxnet
= rxrpc_net(sock_net(&call
->socket
->sk
));
626 write_lock(&rxnet
->call_lock
);
627 list_del_init(&call
->link
);
628 write_unlock(&rxnet
->call_lock
);
631 rxrpc_cleanup_call(call
);
636 * Final call destruction under RCU.
638 static void rxrpc_rcu_destroy_call(struct rcu_head
*rcu
)
640 struct rxrpc_call
*call
= container_of(rcu
, struct rxrpc_call
, rcu
);
642 rxrpc_put_peer(call
->peer
);
643 kfree(call
->rxtx_buffer
);
644 kfree(call
->rxtx_annotations
);
645 kmem_cache_free(rxrpc_call_jar
, call
);
651 void rxrpc_cleanup_call(struct rxrpc_call
*call
)
655 _net("DESTROY CALL %d", call
->debug_id
);
657 memset(&call
->sock_node
, 0xcd, sizeof(call
->sock_node
));
659 del_timer_sync(&call
->timer
);
661 ASSERTCMP(call
->state
, ==, RXRPC_CALL_COMPLETE
);
662 ASSERT(test_bit(RXRPC_CALL_RELEASED
, &call
->flags
));
663 ASSERTCMP(call
->conn
, ==, NULL
);
665 /* Clean up the Rx/Tx buffer */
666 for (i
= 0; i
< RXRPC_RXTX_BUFF_SIZE
; i
++)
667 rxrpc_free_skb(call
->rxtx_buffer
[i
],
668 (call
->tx_phase
? rxrpc_skb_tx_cleaned
:
669 rxrpc_skb_rx_cleaned
));
671 rxrpc_free_skb(call
->tx_pending
, rxrpc_skb_tx_cleaned
);
673 call_rcu(&call
->rcu
, rxrpc_rcu_destroy_call
);
677 * Make sure that all calls are gone from a network namespace. To reach this
678 * point, any open UDP sockets in that namespace must have been closed, so any
679 * outstanding calls cannot be doing I/O.
681 void rxrpc_destroy_all_calls(struct rxrpc_net
*rxnet
)
683 struct rxrpc_call
*call
;
687 if (!list_empty(&rxnet
->calls
)) {
688 write_lock(&rxnet
->call_lock
);
690 while (!list_empty(&rxnet
->calls
)) {
691 call
= list_entry(rxnet
->calls
.next
,
692 struct rxrpc_call
, link
);
693 _debug("Zapping call %p", call
);
695 rxrpc_see_call(call
);
696 list_del_init(&call
->link
);
698 pr_err("Call %p still in use (%d,%s,%lx,%lx)!\n",
699 call
, atomic_read(&call
->usage
),
700 rxrpc_call_states
[call
->state
],
701 call
->flags
, call
->events
);
703 write_unlock(&rxnet
->call_lock
);
705 write_lock(&rxnet
->call_lock
);
708 write_unlock(&rxnet
->call_lock
);