2 * net/tipc/server.c: TIPC server infrastructure
4 * Copyright (c) 2012-2013, Wind River Systems
5 * Copyright (c) 2017-2018, Ericsson AB
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
44 #include <linux/module.h>
46 /* Number of messages to send before rescheduling */
47 #define MAX_SEND_MSG_COUNT 25
48 #define MAX_RECV_MSG_COUNT 25
49 #define CF_CONNECTED 1
52 #define TIPC_SERVER_NAME_LEN 32
55 * struct tipc_topsrv - TIPC server structure
56 * @conn_idr: identifier set of connection
57 * @idr_lock: protect the connection identifier set
58 * @idr_in_use: amount of allocated identifier entry
59 * @net: network namspace instance
60 * @rcvbuf_cache: memory cache of server receive buffer
61 * @rcv_wq: receive workqueue
62 * @send_wq: send workqueue
63 * @max_rcvbuf_size: maximum permitted receive message length
64 * @tipc_conn_new: callback will be called when new connection is incoming
65 * @tipc_conn_release: callback will be called before releasing the connection
66 * @tipc_conn_recvmsg: callback will be called when message arrives
68 * @imp: message importance
73 spinlock_t idr_lock
; /* for idr list */
76 struct work_struct awork
;
77 struct workqueue_struct
*rcv_wq
;
78 struct workqueue_struct
*send_wq
;
80 struct socket
*listener
;
81 char name
[TIPC_SERVER_NAME_LEN
];
85 * struct tipc_conn - TIPC connection structure
86 * @kref: reference counter to connection object
87 * @conid: connection identifier
88 * @sock: socket handler associated with connection
89 * @flags: indicates connection state
90 * @server: pointer to connected server
91 * @sub_list: lsit to all pertaing subscriptions
92 * @sub_lock: lock protecting the subscription list
93 * @outqueue_lock: control access to the outqueue
94 * @rwork: receive work item
95 * @rx_action: what to do when connection socket is active
96 * @outqueue: pointer to first outbound message in queue
97 * @outqueue_lock: control access to the outqueue
98 * @swork: send work item
105 struct tipc_topsrv
*server
;
106 struct list_head sub_list
;
107 spinlock_t sub_lock
; /* for subscription list */
108 struct work_struct rwork
;
109 struct list_head outqueue
;
110 spinlock_t outqueue_lock
; /* for outqueue */
111 struct work_struct swork
;
114 /* An entry waiting to be sent */
115 struct outqueue_entry
{
117 struct tipc_event evt
;
118 struct list_head list
;
121 static void tipc_conn_recv_work(struct work_struct
*work
);
122 static void tipc_conn_send_work(struct work_struct
*work
);
123 static void tipc_topsrv_kern_evt(struct net
*net
, struct tipc_event
*evt
);
124 static void tipc_conn_delete_sub(struct tipc_conn
*con
, struct tipc_subscr
*s
);
126 static bool connected(struct tipc_conn
*con
)
128 return con
&& test_bit(CF_CONNECTED
, &con
->flags
);
131 static void tipc_conn_kref_release(struct kref
*kref
)
133 struct tipc_conn
*con
= container_of(kref
, struct tipc_conn
, kref
);
134 struct tipc_topsrv
*s
= con
->server
;
135 struct outqueue_entry
*e
, *safe
;
137 spin_lock_bh(&s
->idr_lock
);
138 idr_remove(&s
->conn_idr
, con
->conid
);
140 spin_unlock_bh(&s
->idr_lock
);
142 sock_release(con
->sock
);
144 spin_lock_bh(&con
->outqueue_lock
);
145 list_for_each_entry_safe(e
, safe
, &con
->outqueue
, list
) {
149 spin_unlock_bh(&con
->outqueue_lock
);
153 static void conn_put(struct tipc_conn
*con
)
155 kref_put(&con
->kref
, tipc_conn_kref_release
);
158 static void conn_get(struct tipc_conn
*con
)
160 kref_get(&con
->kref
);
163 static void tipc_conn_close(struct tipc_conn
*con
)
165 struct sock
*sk
= con
->sock
->sk
;
166 bool disconnect
= false;
168 write_lock_bh(&sk
->sk_callback_lock
);
169 disconnect
= test_and_clear_bit(CF_CONNECTED
, &con
->flags
);
172 sk
->sk_user_data
= NULL
;
173 tipc_conn_delete_sub(con
, NULL
);
175 write_unlock_bh(&sk
->sk_callback_lock
);
177 /* Handle concurrent calls from sending and receiving threads */
181 /* Don't flush pending works, -just let them expire */
182 kernel_sock_shutdown(con
->sock
, SHUT_RDWR
);
187 static struct tipc_conn
*tipc_conn_alloc(struct tipc_topsrv
*s
)
189 struct tipc_conn
*con
;
192 con
= kzalloc(sizeof(*con
), GFP_ATOMIC
);
194 return ERR_PTR(-ENOMEM
);
196 kref_init(&con
->kref
);
197 INIT_LIST_HEAD(&con
->outqueue
);
198 INIT_LIST_HEAD(&con
->sub_list
);
199 spin_lock_init(&con
->outqueue_lock
);
200 spin_lock_init(&con
->sub_lock
);
201 INIT_WORK(&con
->swork
, tipc_conn_send_work
);
202 INIT_WORK(&con
->rwork
, tipc_conn_recv_work
);
204 spin_lock_bh(&s
->idr_lock
);
205 ret
= idr_alloc(&s
->conn_idr
, con
, 0, 0, GFP_ATOMIC
);
208 spin_unlock_bh(&s
->idr_lock
);
209 return ERR_PTR(-ENOMEM
);
213 spin_unlock_bh(&s
->idr_lock
);
215 set_bit(CF_CONNECTED
, &con
->flags
);
221 static struct tipc_conn
*tipc_conn_lookup(struct tipc_topsrv
*s
, int conid
)
223 struct tipc_conn
*con
;
225 spin_lock_bh(&s
->idr_lock
);
226 con
= idr_find(&s
->conn_idr
, conid
);
227 if (!connected(con
) || !kref_get_unless_zero(&con
->kref
))
229 spin_unlock_bh(&s
->idr_lock
);
233 /* tipc_conn_delete_sub - delete a specific or all subscriptions
234 * for a given subscriber
236 static void tipc_conn_delete_sub(struct tipc_conn
*con
, struct tipc_subscr
*s
)
238 struct tipc_net
*tn
= tipc_net(con
->server
->net
);
239 struct list_head
*sub_list
= &con
->sub_list
;
240 struct tipc_subscription
*sub
, *tmp
;
242 spin_lock_bh(&con
->sub_lock
);
243 list_for_each_entry_safe(sub
, tmp
, sub_list
, sub_list
) {
244 if (!s
|| !memcmp(s
, &sub
->evt
.s
, sizeof(*s
))) {
245 tipc_sub_unsubscribe(sub
);
246 atomic_dec(&tn
->subscription_count
);
251 spin_unlock_bh(&con
->sub_lock
);
254 static void tipc_conn_send_to_sock(struct tipc_conn
*con
)
256 struct list_head
*queue
= &con
->outqueue
;
257 struct tipc_topsrv
*srv
= con
->server
;
258 struct outqueue_entry
*e
;
259 struct tipc_event
*evt
;
265 spin_lock_bh(&con
->outqueue_lock
);
267 while (!list_empty(queue
)) {
268 e
= list_first_entry(queue
, struct outqueue_entry
, list
);
270 spin_unlock_bh(&con
->outqueue_lock
);
273 tipc_conn_delete_sub(con
, &evt
->s
);
275 memset(&msg
, 0, sizeof(msg
));
276 msg
.msg_flags
= MSG_DONTWAIT
;
278 iov
.iov_len
= sizeof(*evt
);
282 ret
= kernel_sendmsg(con
->sock
, &msg
, &iov
,
284 if (ret
== -EWOULDBLOCK
|| ret
== 0) {
287 } else if (ret
< 0) {
288 return tipc_conn_close(con
);
291 tipc_topsrv_kern_evt(srv
->net
, evt
);
294 /* Don't starve users filling buffers */
295 if (++count
>= MAX_SEND_MSG_COUNT
) {
299 spin_lock_bh(&con
->outqueue_lock
);
303 spin_unlock_bh(&con
->outqueue_lock
);
306 static void tipc_conn_send_work(struct work_struct
*work
)
308 struct tipc_conn
*con
= container_of(work
, struct tipc_conn
, swork
);
311 tipc_conn_send_to_sock(con
);
316 /* tipc_topsrv_queue_evt() - interrupt level call from a subscription instance
317 * The queued work is launched into tipc_conn_send_work()->tipc_conn_send_to_sock()
319 void tipc_topsrv_queue_evt(struct net
*net
, int conid
,
320 u32 event
, struct tipc_event
*evt
)
322 struct tipc_topsrv
*srv
= tipc_topsrv(net
);
323 struct outqueue_entry
*e
;
324 struct tipc_conn
*con
;
326 con
= tipc_conn_lookup(srv
, conid
);
333 e
= kmalloc(sizeof(*e
), GFP_ATOMIC
);
336 e
->inactive
= (event
== TIPC_SUBSCR_TIMEOUT
);
337 memcpy(&e
->evt
, evt
, sizeof(*evt
));
338 spin_lock_bh(&con
->outqueue_lock
);
339 list_add_tail(&e
->list
, &con
->outqueue
);
340 spin_unlock_bh(&con
->outqueue_lock
);
342 if (queue_work(srv
->send_wq
, &con
->swork
))
348 /* tipc_conn_write_space - interrupt callback after a sendmsg EAGAIN
349 * Indicates that there now is more space in the send buffer
350 * The queued work is launched into tipc_send_work()->tipc_conn_send_to_sock()
352 static void tipc_conn_write_space(struct sock
*sk
)
354 struct tipc_conn
*con
;
356 read_lock_bh(&sk
->sk_callback_lock
);
357 con
= sk
->sk_user_data
;
358 if (connected(con
)) {
360 if (!queue_work(con
->server
->send_wq
, &con
->swork
))
363 read_unlock_bh(&sk
->sk_callback_lock
);
366 static int tipc_conn_rcv_sub(struct tipc_topsrv
*srv
,
367 struct tipc_conn
*con
,
368 struct tipc_subscr
*s
)
370 struct tipc_net
*tn
= tipc_net(srv
->net
);
371 struct tipc_subscription
*sub
;
373 if (tipc_sub_read(s
, filter
) & TIPC_SUB_CANCEL
) {
374 s
->filter
&= __constant_ntohl(~TIPC_SUB_CANCEL
);
375 tipc_conn_delete_sub(con
, s
);
378 if (atomic_read(&tn
->subscription_count
) >= TIPC_MAX_SUBSCR
) {
379 pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR
);
382 sub
= tipc_sub_subscribe(srv
->net
, s
, con
->conid
);
385 atomic_inc(&tn
->subscription_count
);
386 spin_lock_bh(&con
->sub_lock
);
387 list_add(&sub
->sub_list
, &con
->sub_list
);
388 spin_unlock_bh(&con
->sub_lock
);
392 static int tipc_conn_rcv_from_sock(struct tipc_conn
*con
)
394 struct tipc_topsrv
*srv
= con
->server
;
395 struct sock
*sk
= con
->sock
->sk
;
396 struct msghdr msg
= {};
397 struct tipc_subscr s
;
402 iov
.iov_len
= sizeof(s
);
404 iov_iter_kvec(&msg
.msg_iter
, READ
| ITER_KVEC
, &iov
, 1, iov
.iov_len
);
405 ret
= sock_recvmsg(con
->sock
, &msg
, MSG_DONTWAIT
);
406 if (ret
== -EWOULDBLOCK
)
408 if (ret
== sizeof(s
)) {
409 read_lock_bh(&sk
->sk_callback_lock
);
410 ret
= tipc_conn_rcv_sub(srv
, con
, &s
);
411 read_unlock_bh(&sk
->sk_callback_lock
);
416 tipc_conn_close(con
);
420 static void tipc_conn_recv_work(struct work_struct
*work
)
422 struct tipc_conn
*con
= container_of(work
, struct tipc_conn
, rwork
);
425 while (connected(con
)) {
426 if (tipc_conn_rcv_from_sock(con
))
429 /* Don't flood Rx machine */
430 if (++count
>= MAX_RECV_MSG_COUNT
) {
438 /* tipc_conn_data_ready - interrupt callback indicating the socket has data
439 * The queued work is launched into tipc_recv_work()->tipc_conn_rcv_from_sock()
441 static void tipc_conn_data_ready(struct sock
*sk
)
443 struct tipc_conn
*con
;
445 read_lock_bh(&sk
->sk_callback_lock
);
446 con
= sk
->sk_user_data
;
447 if (connected(con
)) {
449 if (!queue_work(con
->server
->rcv_wq
, &con
->rwork
))
452 read_unlock_bh(&sk
->sk_callback_lock
);
455 static void tipc_topsrv_accept(struct work_struct
*work
)
457 struct tipc_topsrv
*srv
= container_of(work
, struct tipc_topsrv
, awork
);
458 struct socket
*lsock
= srv
->listener
;
459 struct socket
*newsock
;
460 struct tipc_conn
*con
;
465 ret
= kernel_accept(lsock
, &newsock
, O_NONBLOCK
);
468 con
= tipc_conn_alloc(srv
);
471 sock_release(newsock
);
474 /* Register callbacks */
476 write_lock_bh(&newsk
->sk_callback_lock
);
477 newsk
->sk_data_ready
= tipc_conn_data_ready
;
478 newsk
->sk_write_space
= tipc_conn_write_space
;
479 newsk
->sk_user_data
= con
;
481 write_unlock_bh(&newsk
->sk_callback_lock
);
483 /* Wake up receive process in case of 'SYN+' message */
484 newsk
->sk_data_ready(newsk
);
488 /* tipc_toprsv_listener_data_ready - interrupt callback with connection request
489 * The queued job is launched into tipc_topsrv_accept()
491 static void tipc_topsrv_listener_data_ready(struct sock
*sk
)
493 struct tipc_topsrv
*srv
;
495 read_lock_bh(&sk
->sk_callback_lock
);
496 srv
= sk
->sk_user_data
;
498 queue_work(srv
->rcv_wq
, &srv
->awork
);
499 read_unlock_bh(&sk
->sk_callback_lock
);
502 static int tipc_topsrv_create_listener(struct tipc_topsrv
*srv
)
504 int imp
= TIPC_CRITICAL_IMPORTANCE
;
505 struct socket
*lsock
= NULL
;
506 struct sockaddr_tipc saddr
;
510 rc
= sock_create_kern(srv
->net
, AF_TIPC
, SOCK_SEQPACKET
, 0, &lsock
);
514 srv
->listener
= lsock
;
516 write_lock_bh(&sk
->sk_callback_lock
);
517 sk
->sk_data_ready
= tipc_topsrv_listener_data_ready
;
518 sk
->sk_user_data
= srv
;
519 write_unlock_bh(&sk
->sk_callback_lock
);
521 rc
= kernel_setsockopt(lsock
, SOL_TIPC
, TIPC_IMPORTANCE
,
522 (char *)&imp
, sizeof(imp
));
526 saddr
.family
= AF_TIPC
;
527 saddr
.addrtype
= TIPC_ADDR_NAMESEQ
;
528 saddr
.addr
.nameseq
.type
= TIPC_TOP_SRV
;
529 saddr
.addr
.nameseq
.lower
= TIPC_TOP_SRV
;
530 saddr
.addr
.nameseq
.upper
= TIPC_TOP_SRV
;
531 saddr
.scope
= TIPC_NODE_SCOPE
;
533 rc
= kernel_bind(lsock
, (struct sockaddr
*)&saddr
, sizeof(saddr
));
536 rc
= kernel_listen(lsock
, 0);
540 /* As server's listening socket owner and creator is the same module,
541 * we have to decrease TIPC module reference count to guarantee that
542 * it remains zero after the server socket is created, otherwise,
543 * executing "rmmod" command is unable to make TIPC module deleted
544 * after TIPC module is inserted successfully.
546 * However, the reference count is ever increased twice in
547 * sock_create_kern(): one is to increase the reference count of owner
548 * of TIPC socket's proto_ops struct; another is to increment the
549 * reference count of owner of TIPC proto struct. Therefore, we must
550 * decrement the module reference count twice to ensure that it keeps
551 * zero after server's listening socket is created. Of course, we
552 * must bump the module reference count twice as well before the socket
555 module_put(lsock
->ops
->owner
);
556 module_put(sk
->sk_prot_creator
->owner
);
564 bool tipc_topsrv_kern_subscr(struct net
*net
, u32 port
, u32 type
, u32 lower
,
565 u32 upper
, u32 filter
, int *conid
)
567 struct tipc_subscr sub
;
568 struct tipc_conn
*con
;
572 sub
.seq
.lower
= lower
;
573 sub
.seq
.upper
= upper
;
574 sub
.timeout
= TIPC_WAIT_FOREVER
;
576 *(u32
*)&sub
.usr_handle
= port
;
578 con
= tipc_conn_alloc(tipc_topsrv(net
));
584 rc
= tipc_conn_rcv_sub(tipc_topsrv(net
), con
, &sub
);
591 void tipc_topsrv_kern_unsubscr(struct net
*net
, int conid
)
593 struct tipc_conn
*con
;
595 con
= tipc_conn_lookup(tipc_topsrv(net
), conid
);
599 test_and_clear_bit(CF_CONNECTED
, &con
->flags
);
600 tipc_conn_delete_sub(con
, NULL
);
605 static void tipc_topsrv_kern_evt(struct net
*net
, struct tipc_event
*evt
)
607 u32 port
= *(u32
*)&evt
->s
.usr_handle
;
608 u32 self
= tipc_own_addr(net
);
609 struct sk_buff_head evtq
;
612 skb
= tipc_msg_create(TOP_SRV
, 0, INT_H_SIZE
, sizeof(*evt
),
613 self
, self
, port
, port
, 0);
616 msg_set_dest_droppable(buf_msg(skb
), true);
617 memcpy(msg_data(buf_msg(skb
)), evt
, sizeof(*evt
));
618 skb_queue_head_init(&evtq
);
619 __skb_queue_tail(&evtq
, skb
);
620 tipc_sk_rcv(net
, &evtq
);
623 static int tipc_topsrv_work_start(struct tipc_topsrv
*s
)
625 s
->rcv_wq
= alloc_ordered_workqueue("tipc_rcv", 0);
627 pr_err("can't start tipc receive workqueue\n");
631 s
->send_wq
= alloc_ordered_workqueue("tipc_send", 0);
633 pr_err("can't start tipc send workqueue\n");
634 destroy_workqueue(s
->rcv_wq
);
641 static void tipc_topsrv_work_stop(struct tipc_topsrv
*s
)
643 destroy_workqueue(s
->rcv_wq
);
644 destroy_workqueue(s
->send_wq
);
647 static int tipc_topsrv_start(struct net
*net
)
649 struct tipc_net
*tn
= tipc_net(net
);
650 const char name
[] = "topology_server";
651 struct tipc_topsrv
*srv
;
654 srv
= kzalloc(sizeof(*srv
), GFP_ATOMIC
);
659 srv
->max_rcvbuf_size
= sizeof(struct tipc_subscr
);
660 INIT_WORK(&srv
->awork
, tipc_topsrv_accept
);
662 strscpy(srv
->name
, name
, sizeof(srv
->name
));
664 atomic_set(&tn
->subscription_count
, 0);
666 spin_lock_init(&srv
->idr_lock
);
667 idr_init(&srv
->conn_idr
);
670 ret
= tipc_topsrv_work_start(srv
);
674 ret
= tipc_topsrv_create_listener(srv
);
676 tipc_topsrv_work_stop(srv
);
681 static void tipc_topsrv_stop(struct net
*net
)
683 struct tipc_topsrv
*srv
= tipc_topsrv(net
);
684 struct socket
*lsock
= srv
->listener
;
685 struct tipc_conn
*con
;
688 spin_lock_bh(&srv
->idr_lock
);
689 for (id
= 0; srv
->idr_in_use
; id
++) {
690 con
= idr_find(&srv
->conn_idr
, id
);
692 spin_unlock_bh(&srv
->idr_lock
);
693 tipc_conn_close(con
);
694 spin_lock_bh(&srv
->idr_lock
);
697 __module_get(lsock
->ops
->owner
);
698 __module_get(lsock
->sk
->sk_prot_creator
->owner
);
699 srv
->listener
= NULL
;
700 spin_unlock_bh(&srv
->idr_lock
);
702 tipc_topsrv_work_stop(srv
);
703 idr_destroy(&srv
->conn_idr
);
707 int __net_init
tipc_topsrv_init_net(struct net
*net
)
709 return tipc_topsrv_start(net
);
712 void __net_exit
tipc_topsrv_exit_net(struct net
*net
)
714 tipc_topsrv_stop(net
);