1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
15 * This is the "low-level" comms layer.
17 * It is responsible for sending/receiving messages
18 * from other nodes in the cluster.
20 * Cluster nodes are referred to by their nodeids. nodeids are
21 * simply 32 bit numbers to the locking module - if they need to
22 * be expanded for the cluster infrastructure then that is its
23 * responsibility. It is this layer's
24 * responsibility to resolve these into IP address or
25 * whatever it needs for inter-node communication.
27 * The comms level is two kernel threads that deal mainly with
28 * the receiving of messages from other nodes and passing them
29 * up to the mid-level comms layer (which understands the
30 * message format) for execution by the locking core, and
31 * a send thread which does all the setting up of connections
32 * to remote nodes and the sending of data. Threads are not allowed
33 * to send their own data because it may cause them to wait in times
34 * of high load. Also, this way, the sending thread can collect together
35 * messages bound for one node and send them in one block.
37 * lowcomms will choose to use either TCP or SCTP as its transport layer
38 * depending on the configuration variable 'protocol'. This should be set
39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
41 * for the DLM to function.
45 #include <asm/ioctls.h>
48 #include <linux/pagemap.h>
49 #include <linux/file.h>
50 #include <linux/mutex.h>
51 #include <linux/sctp.h>
52 #include <linux/slab.h>
53 #include <net/sctp/sctp.h>
56 #include <trace/events/dlm.h>
57 #include <trace/events/sock.h>
59 #include "dlm_internal.h"
65 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
66 #define DLM_MAX_PROCESS_BUFFERS 24
67 #define NEEDED_RMEM (4*1024*1024)
70 struct socket
*sock
; /* NULL if not connected */
71 uint32_t nodeid
; /* So we know who we are in the list */
72 /* this semaphore is used to allow parallel recv/send in read
73 * lock mode. When we release a sock we need to held the write lock.
75 * However this is locking code and not nice. When we remove the
76 * othercon handling we can look into other mechanism to synchronize
77 * io handling to call sock_release() at the right time.
79 struct rw_semaphore sock_lock
;
81 #define CF_APP_LIMITED 0
82 #define CF_RECV_PENDING 1
83 #define CF_SEND_PENDING 2
84 #define CF_RECV_INTR 3
86 #define CF_IS_OTHERCON 5
87 struct list_head writequeue
; /* List of outgoing writequeue_entries */
88 spinlock_t writequeue_lock
;
90 struct hlist_node list
;
91 /* due some connect()/accept() races we currently have this cross over
92 * connection attempt second connection for one node.
94 * There is a solution to avoid the race by introducing a connect
95 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
96 * connect. Otherside can connect but will only be considered that
97 * the other side wants to have a reconnect.
99 * However changing to this behaviour will break backwards compatible.
100 * In a DLM protocol major version upgrade we should remove this!
102 struct connection
*othercon
;
103 struct work_struct rwork
; /* receive worker */
104 struct work_struct swork
; /* send worker */
105 wait_queue_head_t shutdown_wait
;
106 unsigned char rx_leftover_buf
[DLM_MAX_SOCKET_BUFSIZE
];
111 struct sockaddr_storage addr
[DLM_MAX_ADDR_COUNT
];
112 spinlock_t addrs_lock
;
115 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
117 struct listen_connection
{
119 struct work_struct rwork
;
122 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
123 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
125 /* An entry waiting to be sent */
126 struct writequeue_entry
{
127 struct list_head list
;
134 struct connection
*con
;
135 struct list_head msgs
;
140 struct writequeue_entry
*entry
;
141 struct dlm_msg
*orig_msg
;
145 int idx
; /* new()/commit() idx exchange */
147 struct list_head list
;
151 struct processqueue_entry
{
156 struct list_head list
;
159 struct dlm_proto_ops
{
164 void (*sockopts
)(struct socket
*sock
);
165 int (*bind
)(struct socket
*sock
);
166 int (*listen_validate
)(void);
167 void (*listen_sockopts
)(struct socket
*sock
);
168 int (*listen_bind
)(struct socket
*sock
);
171 static struct listen_sock_callbacks
{
172 void (*sk_error_report
)(struct sock
*);
173 void (*sk_data_ready
)(struct sock
*);
174 void (*sk_state_change
)(struct sock
*);
175 void (*sk_write_space
)(struct sock
*);
178 static struct listen_connection listen_con
;
179 static struct sockaddr_storage dlm_local_addr
[DLM_MAX_ADDR_COUNT
];
180 static int dlm_local_count
;
183 static struct workqueue_struct
*io_workqueue
;
184 static struct workqueue_struct
*process_workqueue
;
186 static struct hlist_head connection_hash
[CONN_HASH_SIZE
];
187 static DEFINE_SPINLOCK(connections_lock
);
188 DEFINE_STATIC_SRCU(connections_srcu
);
190 static const struct dlm_proto_ops
*dlm_proto_ops
;
192 #define DLM_IO_SUCCESS 0
195 #define DLM_IO_RESCHED 3
196 #define DLM_IO_FLUSH 4
198 static void process_recv_sockets(struct work_struct
*work
);
199 static void process_send_sockets(struct work_struct
*work
);
200 static void process_dlm_messages(struct work_struct
*work
);
202 static DECLARE_WORK(process_work
, process_dlm_messages
);
203 static DEFINE_SPINLOCK(processqueue_lock
);
204 static bool process_dlm_messages_pending
;
205 static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq
);
206 static atomic_t processqueue_count
;
207 static LIST_HEAD(processqueue
);
209 bool dlm_lowcomms_is_running(void)
211 return !!listen_con
.sock
;
214 static void lowcomms_queue_swork(struct connection
*con
)
216 assert_spin_locked(&con
->writequeue_lock
);
218 if (!test_bit(CF_IO_STOP
, &con
->flags
) &&
219 !test_bit(CF_APP_LIMITED
, &con
->flags
) &&
220 !test_and_set_bit(CF_SEND_PENDING
, &con
->flags
))
221 queue_work(io_workqueue
, &con
->swork
);
224 static void lowcomms_queue_rwork(struct connection
*con
)
226 #ifdef CONFIG_LOCKDEP
227 WARN_ON_ONCE(!lockdep_sock_is_held(con
->sock
->sk
));
230 if (!test_bit(CF_IO_STOP
, &con
->flags
) &&
231 !test_and_set_bit(CF_RECV_PENDING
, &con
->flags
))
232 queue_work(io_workqueue
, &con
->rwork
);
235 static void writequeue_entry_ctor(void *data
)
237 struct writequeue_entry
*entry
= data
;
239 INIT_LIST_HEAD(&entry
->msgs
);
242 struct kmem_cache
*dlm_lowcomms_writequeue_cache_create(void)
244 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry
),
245 0, 0, writequeue_entry_ctor
);
248 struct kmem_cache
*dlm_lowcomms_msg_cache_create(void)
250 return KMEM_CACHE(dlm_msg
, 0);
253 /* need to held writequeue_lock */
254 static struct writequeue_entry
*con_next_wq(struct connection
*con
)
256 struct writequeue_entry
*e
;
258 e
= list_first_entry_or_null(&con
->writequeue
, struct writequeue_entry
,
260 /* if len is zero nothing is to send, if there are users filling
261 * buffers we wait until the users are done so we can send more.
263 if (!e
|| e
->users
|| e
->len
== 0)
269 static struct connection
*__find_con(int nodeid
, int r
)
271 struct connection
*con
;
273 hlist_for_each_entry_rcu(con
, &connection_hash
[r
], list
) {
274 if (con
->nodeid
== nodeid
)
281 static void dlm_con_init(struct connection
*con
, int nodeid
)
283 con
->nodeid
= nodeid
;
284 init_rwsem(&con
->sock_lock
);
285 INIT_LIST_HEAD(&con
->writequeue
);
286 spin_lock_init(&con
->writequeue_lock
);
287 INIT_WORK(&con
->swork
, process_send_sockets
);
288 INIT_WORK(&con
->rwork
, process_recv_sockets
);
289 spin_lock_init(&con
->addrs_lock
);
290 init_waitqueue_head(&con
->shutdown_wait
);
294 * If 'allocation' is zero then we don't attempt to create a new
295 * connection structure for this node.
297 static struct connection
*nodeid2con(int nodeid
, gfp_t alloc
)
299 struct connection
*con
, *tmp
;
302 r
= nodeid_hash(nodeid
);
303 con
= __find_con(nodeid
, r
);
307 con
= kzalloc(sizeof(*con
), alloc
);
311 dlm_con_init(con
, nodeid
);
313 spin_lock(&connections_lock
);
314 /* Because multiple workqueues/threads calls this function it can
315 * race on multiple cpu's. Instead of locking hot path __find_con()
316 * we just check in rare cases of recently added nodes again
317 * under protection of connections_lock. If this is the case we
318 * abort our connection creation and return the existing connection.
320 tmp
= __find_con(nodeid
, r
);
322 spin_unlock(&connections_lock
);
327 hlist_add_head_rcu(&con
->list
, &connection_hash
[r
]);
328 spin_unlock(&connections_lock
);
333 static int addr_compare(const struct sockaddr_storage
*x
,
334 const struct sockaddr_storage
*y
)
336 switch (x
->ss_family
) {
338 struct sockaddr_in
*sinx
= (struct sockaddr_in
*)x
;
339 struct sockaddr_in
*siny
= (struct sockaddr_in
*)y
;
340 if (sinx
->sin_addr
.s_addr
!= siny
->sin_addr
.s_addr
)
342 if (sinx
->sin_port
!= siny
->sin_port
)
347 struct sockaddr_in6
*sinx
= (struct sockaddr_in6
*)x
;
348 struct sockaddr_in6
*siny
= (struct sockaddr_in6
*)y
;
349 if (!ipv6_addr_equal(&sinx
->sin6_addr
, &siny
->sin6_addr
))
351 if (sinx
->sin6_port
!= siny
->sin6_port
)
361 static int nodeid_to_addr(int nodeid
, struct sockaddr_storage
*sas_out
,
362 struct sockaddr
*sa_out
, bool try_new_addr
,
365 struct sockaddr_storage sas
;
366 struct connection
*con
;
369 if (!dlm_local_count
)
372 idx
= srcu_read_lock(&connections_srcu
);
373 con
= nodeid2con(nodeid
, 0);
375 srcu_read_unlock(&connections_srcu
, idx
);
379 spin_lock(&con
->addrs_lock
);
380 if (!con
->addr_count
) {
381 spin_unlock(&con
->addrs_lock
);
382 srcu_read_unlock(&connections_srcu
, idx
);
386 memcpy(&sas
, &con
->addr
[con
->curr_addr_index
],
387 sizeof(struct sockaddr_storage
));
390 con
->curr_addr_index
++;
391 if (con
->curr_addr_index
== con
->addr_count
)
392 con
->curr_addr_index
= 0;
396 spin_unlock(&con
->addrs_lock
);
399 memcpy(sas_out
, &sas
, sizeof(struct sockaddr_storage
));
402 srcu_read_unlock(&connections_srcu
, idx
);
406 if (dlm_local_addr
[0].ss_family
== AF_INET
) {
407 struct sockaddr_in
*in4
= (struct sockaddr_in
*) &sas
;
408 struct sockaddr_in
*ret4
= (struct sockaddr_in
*) sa_out
;
409 ret4
->sin_addr
.s_addr
= in4
->sin_addr
.s_addr
;
411 struct sockaddr_in6
*in6
= (struct sockaddr_in6
*) &sas
;
412 struct sockaddr_in6
*ret6
= (struct sockaddr_in6
*) sa_out
;
413 ret6
->sin6_addr
= in6
->sin6_addr
;
416 srcu_read_unlock(&connections_srcu
, idx
);
420 static int addr_to_nodeid(struct sockaddr_storage
*addr
, int *nodeid
,
423 struct connection
*con
;
426 idx
= srcu_read_lock(&connections_srcu
);
427 for (i
= 0; i
< CONN_HASH_SIZE
; i
++) {
428 hlist_for_each_entry_rcu(con
, &connection_hash
[i
], list
) {
429 WARN_ON_ONCE(!con
->addr_count
);
431 spin_lock(&con
->addrs_lock
);
432 for (addr_i
= 0; addr_i
< con
->addr_count
; addr_i
++) {
433 if (addr_compare(&con
->addr
[addr_i
], addr
)) {
434 *nodeid
= con
->nodeid
;
436 spin_unlock(&con
->addrs_lock
);
437 srcu_read_unlock(&connections_srcu
, idx
);
441 spin_unlock(&con
->addrs_lock
);
444 srcu_read_unlock(&connections_srcu
, idx
);
449 static bool dlm_lowcomms_con_has_addr(const struct connection
*con
,
450 const struct sockaddr_storage
*addr
)
454 for (i
= 0; i
< con
->addr_count
; i
++) {
455 if (addr_compare(&con
->addr
[i
], addr
))
462 int dlm_lowcomms_addr(int nodeid
, struct sockaddr_storage
*addr
)
464 struct connection
*con
;
468 idx
= srcu_read_lock(&connections_srcu
);
469 con
= nodeid2con(nodeid
, GFP_NOFS
);
471 srcu_read_unlock(&connections_srcu
, idx
);
475 spin_lock(&con
->addrs_lock
);
476 if (!con
->addr_count
) {
477 memcpy(&con
->addr
[0], addr
, sizeof(*addr
));
479 con
->mark
= dlm_config
.ci_mark
;
480 spin_unlock(&con
->addrs_lock
);
481 srcu_read_unlock(&connections_srcu
, idx
);
485 ret
= dlm_lowcomms_con_has_addr(con
, addr
);
487 spin_unlock(&con
->addrs_lock
);
488 srcu_read_unlock(&connections_srcu
, idx
);
492 if (con
->addr_count
>= DLM_MAX_ADDR_COUNT
) {
493 spin_unlock(&con
->addrs_lock
);
494 srcu_read_unlock(&connections_srcu
, idx
);
498 memcpy(&con
->addr
[con
->addr_count
++], addr
, sizeof(*addr
));
499 srcu_read_unlock(&connections_srcu
, idx
);
500 spin_unlock(&con
->addrs_lock
);
504 /* Data available on socket or listen socket received a connect */
505 static void lowcomms_data_ready(struct sock
*sk
)
507 struct connection
*con
= sock2con(sk
);
509 trace_sk_data_ready(sk
);
511 set_bit(CF_RECV_INTR
, &con
->flags
);
512 lowcomms_queue_rwork(con
);
515 static void lowcomms_write_space(struct sock
*sk
)
517 struct connection
*con
= sock2con(sk
);
519 clear_bit(SOCK_NOSPACE
, &con
->sock
->flags
);
521 spin_lock_bh(&con
->writequeue_lock
);
522 if (test_and_clear_bit(CF_APP_LIMITED
, &con
->flags
)) {
523 con
->sock
->sk
->sk_write_pending
--;
524 clear_bit(SOCKWQ_ASYNC_NOSPACE
, &con
->sock
->flags
);
527 lowcomms_queue_swork(con
);
528 spin_unlock_bh(&con
->writequeue_lock
);
531 static void lowcomms_state_change(struct sock
*sk
)
533 /* SCTP layer is not calling sk_data_ready when the connection
534 * is done, so we catch the signal through here.
536 if (sk
->sk_shutdown
== RCV_SHUTDOWN
)
537 lowcomms_data_ready(sk
);
540 static void lowcomms_listen_data_ready(struct sock
*sk
)
542 trace_sk_data_ready(sk
);
544 queue_work(io_workqueue
, &listen_con
.rwork
);
547 int dlm_lowcomms_connect_node(int nodeid
)
549 struct connection
*con
;
552 idx
= srcu_read_lock(&connections_srcu
);
553 con
= nodeid2con(nodeid
, 0);
554 if (WARN_ON_ONCE(!con
)) {
555 srcu_read_unlock(&connections_srcu
, idx
);
559 down_read(&con
->sock_lock
);
561 spin_lock_bh(&con
->writequeue_lock
);
562 lowcomms_queue_swork(con
);
563 spin_unlock_bh(&con
->writequeue_lock
);
565 up_read(&con
->sock_lock
);
566 srcu_read_unlock(&connections_srcu
, idx
);
572 int dlm_lowcomms_nodes_set_mark(int nodeid
, unsigned int mark
)
574 struct connection
*con
;
577 idx
= srcu_read_lock(&connections_srcu
);
578 con
= nodeid2con(nodeid
, 0);
580 srcu_read_unlock(&connections_srcu
, idx
);
584 spin_lock(&con
->addrs_lock
);
586 spin_unlock(&con
->addrs_lock
);
587 srcu_read_unlock(&connections_srcu
, idx
);
591 static void lowcomms_error_report(struct sock
*sk
)
593 struct connection
*con
= sock2con(sk
);
594 struct inet_sock
*inet
;
597 switch (sk
->sk_family
) {
599 printk_ratelimited(KERN_ERR
"dlm: node %d: socket error "
600 "sending to node %d at %pI4, dport %d, "
601 "sk_err=%d/%d\n", dlm_our_nodeid(),
602 con
->nodeid
, &inet
->inet_daddr
,
603 ntohs(inet
->inet_dport
), sk
->sk_err
,
604 READ_ONCE(sk
->sk_err_soft
));
606 #if IS_ENABLED(CONFIG_IPV6)
608 printk_ratelimited(KERN_ERR
"dlm: node %d: socket error "
609 "sending to node %d at %pI6c, "
610 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
611 con
->nodeid
, &sk
->sk_v6_daddr
,
612 ntohs(inet
->inet_dport
), sk
->sk_err
,
613 READ_ONCE(sk
->sk_err_soft
));
617 printk_ratelimited(KERN_ERR
"dlm: node %d: socket error "
618 "invalid socket family %d set, "
619 "sk_err=%d/%d\n", dlm_our_nodeid(),
620 sk
->sk_family
, sk
->sk_err
,
621 READ_ONCE(sk
->sk_err_soft
));
625 dlm_midcomms_unack_msg_resend(con
->nodeid
);
627 listen_sock
.sk_error_report(sk
);
630 static void restore_callbacks(struct sock
*sk
)
632 #ifdef CONFIG_LOCKDEP
633 WARN_ON_ONCE(!lockdep_sock_is_held(sk
));
636 sk
->sk_user_data
= NULL
;
637 sk
->sk_data_ready
= listen_sock
.sk_data_ready
;
638 sk
->sk_state_change
= listen_sock
.sk_state_change
;
639 sk
->sk_write_space
= listen_sock
.sk_write_space
;
640 sk
->sk_error_report
= listen_sock
.sk_error_report
;
643 /* Make a socket active */
644 static void add_sock(struct socket
*sock
, struct connection
*con
)
646 struct sock
*sk
= sock
->sk
;
651 sk
->sk_user_data
= con
;
652 sk
->sk_data_ready
= lowcomms_data_ready
;
653 sk
->sk_write_space
= lowcomms_write_space
;
654 if (dlm_config
.ci_protocol
== DLM_PROTO_SCTP
)
655 sk
->sk_state_change
= lowcomms_state_change
;
656 sk
->sk_allocation
= GFP_NOFS
;
657 sk
->sk_use_task_frag
= false;
658 sk
->sk_error_report
= lowcomms_error_report
;
662 /* Add the port number to an IPv6 or 4 sockaddr and return the address
664 static void make_sockaddr(struct sockaddr_storage
*saddr
, __be16 port
,
667 saddr
->ss_family
= dlm_local_addr
[0].ss_family
;
668 if (saddr
->ss_family
== AF_INET
) {
669 struct sockaddr_in
*in4_addr
= (struct sockaddr_in
*)saddr
;
670 in4_addr
->sin_port
= port
;
671 *addr_len
= sizeof(struct sockaddr_in
);
672 memset(&in4_addr
->sin_zero
, 0, sizeof(in4_addr
->sin_zero
));
674 struct sockaddr_in6
*in6_addr
= (struct sockaddr_in6
*)saddr
;
675 in6_addr
->sin6_port
= port
;
676 *addr_len
= sizeof(struct sockaddr_in6
);
678 memset((char *)saddr
+ *addr_len
, 0, sizeof(struct sockaddr_storage
) - *addr_len
);
681 static void dlm_page_release(struct kref
*kref
)
683 struct writequeue_entry
*e
= container_of(kref
, struct writequeue_entry
,
686 __free_page(e
->page
);
687 dlm_free_writequeue(e
);
690 static void dlm_msg_release(struct kref
*kref
)
692 struct dlm_msg
*msg
= container_of(kref
, struct dlm_msg
, ref
);
694 kref_put(&msg
->entry
->ref
, dlm_page_release
);
698 static void free_entry(struct writequeue_entry
*e
)
700 struct dlm_msg
*msg
, *tmp
;
702 list_for_each_entry_safe(msg
, tmp
, &e
->msgs
, list
) {
704 msg
->orig_msg
->retransmit
= false;
705 kref_put(&msg
->orig_msg
->ref
, dlm_msg_release
);
708 list_del(&msg
->list
);
709 kref_put(&msg
->ref
, dlm_msg_release
);
713 kref_put(&e
->ref
, dlm_page_release
);
716 static void dlm_close_sock(struct socket
**sock
)
718 lock_sock((*sock
)->sk
);
719 restore_callbacks((*sock
)->sk
);
720 release_sock((*sock
)->sk
);
726 static void allow_connection_io(struct connection
*con
)
729 clear_bit(CF_IO_STOP
, &con
->othercon
->flags
);
730 clear_bit(CF_IO_STOP
, &con
->flags
);
733 static void stop_connection_io(struct connection
*con
)
736 stop_connection_io(con
->othercon
);
738 spin_lock_bh(&con
->writequeue_lock
);
739 set_bit(CF_IO_STOP
, &con
->flags
);
740 spin_unlock_bh(&con
->writequeue_lock
);
742 down_write(&con
->sock_lock
);
744 lock_sock(con
->sock
->sk
);
745 restore_callbacks(con
->sock
->sk
);
746 release_sock(con
->sock
->sk
);
748 up_write(&con
->sock_lock
);
750 cancel_work_sync(&con
->swork
);
751 cancel_work_sync(&con
->rwork
);
754 /* Close a remote connection and tidy up */
755 static void close_connection(struct connection
*con
, bool and_other
)
757 struct writequeue_entry
*e
;
759 if (con
->othercon
&& and_other
)
760 close_connection(con
->othercon
, false);
762 down_write(&con
->sock_lock
);
764 up_write(&con
->sock_lock
);
768 dlm_close_sock(&con
->sock
);
770 /* if we send a writequeue entry only a half way, we drop the
771 * whole entry because reconnection and that we not start of the
772 * middle of a msg which will confuse the other end.
774 * we can always drop messages because retransmits, but what we
775 * cannot allow is to transmit half messages which may be processed
778 * our policy is to start on a clean state when disconnects, we don't
779 * know what's send/received on transport layer in this case.
781 spin_lock_bh(&con
->writequeue_lock
);
782 if (!list_empty(&con
->writequeue
)) {
783 e
= list_first_entry(&con
->writequeue
, struct writequeue_entry
,
788 spin_unlock_bh(&con
->writequeue_lock
);
790 con
->rx_leftover
= 0;
792 clear_bit(CF_APP_LIMITED
, &con
->flags
);
793 clear_bit(CF_RECV_PENDING
, &con
->flags
);
794 clear_bit(CF_SEND_PENDING
, &con
->flags
);
795 up_write(&con
->sock_lock
);
798 static void shutdown_connection(struct connection
*con
, bool and_other
)
802 if (con
->othercon
&& and_other
)
803 shutdown_connection(con
->othercon
, false);
805 flush_workqueue(io_workqueue
);
806 down_read(&con
->sock_lock
);
807 /* nothing to shutdown */
809 up_read(&con
->sock_lock
);
813 ret
= kernel_sock_shutdown(con
->sock
, SHUT_WR
);
814 up_read(&con
->sock_lock
);
816 log_print("Connection %p failed to shutdown: %d will force close",
820 ret
= wait_event_timeout(con
->shutdown_wait
, !con
->sock
,
821 DLM_SHUTDOWN_WAIT_TIMEOUT
);
823 log_print("Connection %p shutdown timed out, will force close",
832 close_connection(con
, false);
835 static struct processqueue_entry
*new_processqueue_entry(int nodeid
,
838 struct processqueue_entry
*pentry
;
840 pentry
= kmalloc(sizeof(*pentry
), GFP_NOFS
);
844 pentry
->buf
= kmalloc(buflen
, GFP_NOFS
);
850 pentry
->nodeid
= nodeid
;
854 static void free_processqueue_entry(struct processqueue_entry
*pentry
)
860 static void process_dlm_messages(struct work_struct
*work
)
862 struct processqueue_entry
*pentry
;
864 spin_lock_bh(&processqueue_lock
);
865 pentry
= list_first_entry_or_null(&processqueue
,
866 struct processqueue_entry
, list
);
867 if (WARN_ON_ONCE(!pentry
)) {
868 process_dlm_messages_pending
= false;
869 spin_unlock_bh(&processqueue_lock
);
873 list_del(&pentry
->list
);
874 if (atomic_dec_and_test(&processqueue_count
))
875 wake_up(&processqueue_wq
);
876 spin_unlock_bh(&processqueue_lock
);
879 dlm_process_incoming_buffer(pentry
->nodeid
, pentry
->buf
,
881 free_processqueue_entry(pentry
);
883 spin_lock_bh(&processqueue_lock
);
884 pentry
= list_first_entry_or_null(&processqueue
,
885 struct processqueue_entry
, list
);
887 process_dlm_messages_pending
= false;
888 spin_unlock_bh(&processqueue_lock
);
892 list_del(&pentry
->list
);
893 if (atomic_dec_and_test(&processqueue_count
))
894 wake_up(&processqueue_wq
);
895 spin_unlock_bh(&processqueue_lock
);
899 /* Data received from remote end */
900 static int receive_from_sock(struct connection
*con
, int buflen
)
902 struct processqueue_entry
*pentry
;
903 int ret
, buflen_real
;
907 pentry
= new_processqueue_entry(con
->nodeid
, buflen
);
909 return DLM_IO_RESCHED
;
911 memcpy(pentry
->buf
, con
->rx_leftover_buf
, con
->rx_leftover
);
913 /* calculate new buffer parameter regarding last receive and
914 * possible leftover bytes
916 iov
.iov_base
= pentry
->buf
+ con
->rx_leftover
;
917 iov
.iov_len
= buflen
- con
->rx_leftover
;
919 memset(&msg
, 0, sizeof(msg
));
920 msg
.msg_flags
= MSG_DONTWAIT
| MSG_NOSIGNAL
;
921 clear_bit(CF_RECV_INTR
, &con
->flags
);
923 ret
= kernel_recvmsg(con
->sock
, &msg
, &iov
, 1, iov
.iov_len
,
925 trace_dlm_recv(con
->nodeid
, ret
);
926 if (ret
== -EAGAIN
) {
927 lock_sock(con
->sock
->sk
);
928 if (test_and_clear_bit(CF_RECV_INTR
, &con
->flags
)) {
929 release_sock(con
->sock
->sk
);
933 clear_bit(CF_RECV_PENDING
, &con
->flags
);
934 release_sock(con
->sock
->sk
);
935 free_processqueue_entry(pentry
);
937 } else if (ret
== 0) {
938 /* close will clear CF_RECV_PENDING */
939 free_processqueue_entry(pentry
);
941 } else if (ret
< 0) {
942 free_processqueue_entry(pentry
);
946 /* new buflen according readed bytes and leftover from last receive */
947 buflen_real
= ret
+ con
->rx_leftover
;
948 ret
= dlm_validate_incoming_buffer(con
->nodeid
, pentry
->buf
,
951 free_processqueue_entry(pentry
);
955 pentry
->buflen
= ret
;
957 /* calculate leftover bytes from process and put it into begin of
958 * the receive buffer, so next receive we have the full message
959 * at the start address of the receive buffer.
961 con
->rx_leftover
= buflen_real
- ret
;
962 memmove(con
->rx_leftover_buf
, pentry
->buf
+ ret
,
965 spin_lock_bh(&processqueue_lock
);
966 ret
= atomic_inc_return(&processqueue_count
);
967 list_add_tail(&pentry
->list
, &processqueue
);
968 if (!process_dlm_messages_pending
) {
969 process_dlm_messages_pending
= true;
970 queue_work(process_workqueue
, &process_work
);
972 spin_unlock_bh(&processqueue_lock
);
974 if (ret
> DLM_MAX_PROCESS_BUFFERS
)
977 return DLM_IO_SUCCESS
;
980 /* Listening socket is busy, accept a connection */
981 static int accept_from_sock(void)
983 struct sockaddr_storage peeraddr
;
984 int len
, idx
, result
, nodeid
;
985 struct connection
*newcon
;
986 struct socket
*newsock
;
989 result
= kernel_accept(listen_con
.sock
, &newsock
, O_NONBLOCK
);
990 if (result
== -EAGAIN
)
995 /* Get the connected socket's peer */
996 memset(&peeraddr
, 0, sizeof(peeraddr
));
997 len
= newsock
->ops
->getname(newsock
, (struct sockaddr
*)&peeraddr
, 2);
999 result
= -ECONNABORTED
;
1003 /* Get the new node's NODEID */
1004 make_sockaddr(&peeraddr
, 0, &len
);
1005 if (addr_to_nodeid(&peeraddr
, &nodeid
, &mark
)) {
1006 switch (peeraddr
.ss_family
) {
1008 struct sockaddr_in
*sin
= (struct sockaddr_in
*)&peeraddr
;
1010 log_print("connect from non cluster IPv4 node %pI4",
1014 #if IS_ENABLED(CONFIG_IPV6)
1016 struct sockaddr_in6
*sin6
= (struct sockaddr_in6
*)&peeraddr
;
1018 log_print("connect from non cluster IPv6 node %pI6c",
1024 log_print("invalid family from non cluster node");
1028 sock_release(newsock
);
1032 log_print("got connection from %d", nodeid
);
1034 /* Check to see if we already have a connection to this node. This
1035 * could happen if the two nodes initiate a connection at roughly
1036 * the same time and the connections cross on the wire.
1037 * In this case we store the incoming one in "othercon"
1039 idx
= srcu_read_lock(&connections_srcu
);
1040 newcon
= nodeid2con(nodeid
, 0);
1041 if (WARN_ON_ONCE(!newcon
)) {
1042 srcu_read_unlock(&connections_srcu
, idx
);
1047 sock_set_mark(newsock
->sk
, mark
);
1049 down_write(&newcon
->sock_lock
);
1051 struct connection
*othercon
= newcon
->othercon
;
1054 othercon
= kzalloc(sizeof(*othercon
), GFP_NOFS
);
1056 log_print("failed to allocate incoming socket");
1057 up_write(&newcon
->sock_lock
);
1058 srcu_read_unlock(&connections_srcu
, idx
);
1063 dlm_con_init(othercon
, nodeid
);
1064 lockdep_set_subclass(&othercon
->sock_lock
, 1);
1065 newcon
->othercon
= othercon
;
1066 set_bit(CF_IS_OTHERCON
, &othercon
->flags
);
1068 /* close other sock con if we have something new */
1069 close_connection(othercon
, false);
1072 down_write(&othercon
->sock_lock
);
1073 add_sock(newsock
, othercon
);
1075 /* check if we receved something while adding */
1076 lock_sock(othercon
->sock
->sk
);
1077 lowcomms_queue_rwork(othercon
);
1078 release_sock(othercon
->sock
->sk
);
1079 up_write(&othercon
->sock_lock
);
1082 /* accept copies the sk after we've saved the callbacks, so we
1083 don't want to save them a second time or comm errors will
1084 result in calling sk_error_report recursively. */
1085 add_sock(newsock
, newcon
);
1087 /* check if we receved something while adding */
1088 lock_sock(newcon
->sock
->sk
);
1089 lowcomms_queue_rwork(newcon
);
1090 release_sock(newcon
->sock
->sk
);
1092 up_write(&newcon
->sock_lock
);
1093 srcu_read_unlock(&connections_srcu
, idx
);
1095 return DLM_IO_SUCCESS
;
1099 sock_release(newsock
);
1105 * writequeue_entry_complete - try to delete and free write queue entry
1106 * @e: write queue entry to try to delete
1107 * @completed: bytes completed
1109 * writequeue_lock must be held.
1111 static void writequeue_entry_complete(struct writequeue_entry
*e
, int completed
)
1113 e
->offset
+= completed
;
1114 e
->len
-= completed
;
1115 /* signal that page was half way transmitted */
1118 if (e
->len
== 0 && e
->users
== 0)
1123 * sctp_bind_addrs - bind a SCTP socket to all our addresses
1125 static int sctp_bind_addrs(struct socket
*sock
, __be16 port
)
1127 struct sockaddr_storage localaddr
;
1128 struct sockaddr
*addr
= (struct sockaddr
*)&localaddr
;
1129 int i
, addr_len
, result
= 0;
1131 for (i
= 0; i
< dlm_local_count
; i
++) {
1132 memcpy(&localaddr
, &dlm_local_addr
[i
], sizeof(localaddr
));
1133 make_sockaddr(&localaddr
, port
, &addr_len
);
1136 result
= kernel_bind(sock
, addr
, addr_len
);
1138 result
= sock_bind_add(sock
->sk
, addr
, addr_len
);
1141 log_print("Can't bind to %d addr number %d, %d.\n",
1142 port
, i
+ 1, result
);
1149 /* Get local addresses */
1150 static void init_local(void)
1152 struct sockaddr_storage sas
;
1155 dlm_local_count
= 0;
1156 for (i
= 0; i
< DLM_MAX_ADDR_COUNT
; i
++) {
1157 if (dlm_our_addr(&sas
, i
))
1160 memcpy(&dlm_local_addr
[dlm_local_count
++], &sas
, sizeof(sas
));
1164 static struct writequeue_entry
*new_writequeue_entry(struct connection
*con
)
1166 struct writequeue_entry
*entry
;
1168 entry
= dlm_allocate_writequeue();
1172 entry
->page
= alloc_page(GFP_ATOMIC
| __GFP_ZERO
);
1174 dlm_free_writequeue(entry
);
1181 entry
->dirty
= false;
1184 kref_init(&entry
->ref
);
1188 static struct writequeue_entry
*new_wq_entry(struct connection
*con
, int len
,
1189 char **ppc
, void (*cb
)(void *data
),
1192 struct writequeue_entry
*e
;
1194 spin_lock_bh(&con
->writequeue_lock
);
1195 if (!list_empty(&con
->writequeue
)) {
1196 e
= list_last_entry(&con
->writequeue
, struct writequeue_entry
, list
);
1197 if (DLM_WQ_REMAIN_BYTES(e
) >= len
) {
1200 *ppc
= page_address(e
->page
) + e
->end
;
1210 e
= new_writequeue_entry(con
);
1215 *ppc
= page_address(e
->page
);
1220 list_add_tail(&e
->list
, &con
->writequeue
);
1223 spin_unlock_bh(&con
->writequeue_lock
);
1227 static struct dlm_msg
*dlm_lowcomms_new_msg_con(struct connection
*con
, int len
,
1228 char **ppc
, void (*cb
)(void *data
),
1231 struct writequeue_entry
*e
;
1232 struct dlm_msg
*msg
;
1234 msg
= dlm_allocate_msg();
1238 kref_init(&msg
->ref
);
1240 e
= new_wq_entry(con
, len
, ppc
, cb
, data
);
1246 msg
->retransmit
= false;
1247 msg
->orig_msg
= NULL
;
1255 /* avoid false positive for nodes_srcu, unlock happens in
1256 * dlm_lowcomms_commit_msg which is a must call if success
1259 struct dlm_msg
*dlm_lowcomms_new_msg(int nodeid
, int len
, char **ppc
,
1260 void (*cb
)(void *data
), void *data
)
1262 struct connection
*con
;
1263 struct dlm_msg
*msg
;
1266 if (len
> DLM_MAX_SOCKET_BUFSIZE
||
1267 len
< sizeof(struct dlm_header
)) {
1268 BUILD_BUG_ON(PAGE_SIZE
< DLM_MAX_SOCKET_BUFSIZE
);
1269 log_print("failed to allocate a buffer of size %d", len
);
1274 idx
= srcu_read_lock(&connections_srcu
);
1275 con
= nodeid2con(nodeid
, 0);
1276 if (WARN_ON_ONCE(!con
)) {
1277 srcu_read_unlock(&connections_srcu
, idx
);
1281 msg
= dlm_lowcomms_new_msg_con(con
, len
, ppc
, cb
, data
);
1283 srcu_read_unlock(&connections_srcu
, idx
);
1287 /* for dlm_lowcomms_commit_msg() */
1288 kref_get(&msg
->ref
);
1289 /* we assume if successful commit must called */
1295 static void _dlm_lowcomms_commit_msg(struct dlm_msg
*msg
)
1297 struct writequeue_entry
*e
= msg
->entry
;
1298 struct connection
*con
= e
->con
;
1301 spin_lock_bh(&con
->writequeue_lock
);
1302 kref_get(&msg
->ref
);
1303 list_add(&msg
->list
, &e
->msgs
);
1309 e
->len
= DLM_WQ_LENGTH_BYTES(e
);
1311 lowcomms_queue_swork(con
);
1314 spin_unlock_bh(&con
->writequeue_lock
);
1318 /* avoid false positive for nodes_srcu, lock was happen in
1319 * dlm_lowcomms_new_msg
1322 void dlm_lowcomms_commit_msg(struct dlm_msg
*msg
)
1324 _dlm_lowcomms_commit_msg(msg
);
1325 srcu_read_unlock(&connections_srcu
, msg
->idx
);
1326 /* because dlm_lowcomms_new_msg() */
1327 kref_put(&msg
->ref
, dlm_msg_release
);
1331 void dlm_lowcomms_put_msg(struct dlm_msg
*msg
)
1333 kref_put(&msg
->ref
, dlm_msg_release
);
1336 /* does not held connections_srcu, usage lowcomms_error_report only */
1337 int dlm_lowcomms_resend_msg(struct dlm_msg
*msg
)
1339 struct dlm_msg
*msg_resend
;
1342 if (msg
->retransmit
)
1345 msg_resend
= dlm_lowcomms_new_msg_con(msg
->entry
->con
, msg
->len
, &ppc
,
1350 msg
->retransmit
= true;
1351 kref_get(&msg
->ref
);
1352 msg_resend
->orig_msg
= msg
;
1354 memcpy(ppc
, msg
->ppc
, msg
->len
);
1355 _dlm_lowcomms_commit_msg(msg_resend
);
1356 dlm_lowcomms_put_msg(msg_resend
);
1361 /* Send a message */
1362 static int send_to_sock(struct connection
*con
)
1364 struct writequeue_entry
*e
;
1365 struct bio_vec bvec
;
1366 struct msghdr msg
= {
1367 .msg_flags
= MSG_SPLICE_PAGES
| MSG_DONTWAIT
| MSG_NOSIGNAL
,
1369 int len
, offset
, ret
;
1371 spin_lock_bh(&con
->writequeue_lock
);
1372 e
= con_next_wq(con
);
1374 clear_bit(CF_SEND_PENDING
, &con
->flags
);
1375 spin_unlock_bh(&con
->writequeue_lock
);
1381 WARN_ON_ONCE(len
== 0 && e
->users
== 0);
1382 spin_unlock_bh(&con
->writequeue_lock
);
1384 bvec_set_page(&bvec
, e
->page
, len
, offset
);
1385 iov_iter_bvec(&msg
.msg_iter
, ITER_SOURCE
, &bvec
, 1, len
);
1386 ret
= sock_sendmsg(con
->sock
, &msg
);
1387 trace_dlm_send(con
->nodeid
, ret
);
1388 if (ret
== -EAGAIN
|| ret
== 0) {
1389 lock_sock(con
->sock
->sk
);
1390 spin_lock_bh(&con
->writequeue_lock
);
1391 if (test_bit(SOCKWQ_ASYNC_NOSPACE
, &con
->sock
->flags
) &&
1392 !test_and_set_bit(CF_APP_LIMITED
, &con
->flags
)) {
1393 /* Notify TCP that we're limited by the
1394 * application window size.
1396 set_bit(SOCK_NOSPACE
, &con
->sock
->sk
->sk_socket
->flags
);
1397 con
->sock
->sk
->sk_write_pending
++;
1399 clear_bit(CF_SEND_PENDING
, &con
->flags
);
1400 spin_unlock_bh(&con
->writequeue_lock
);
1401 release_sock(con
->sock
->sk
);
1403 /* wait for write_space() event */
1406 spin_unlock_bh(&con
->writequeue_lock
);
1407 release_sock(con
->sock
->sk
);
1409 return DLM_IO_RESCHED
;
1410 } else if (ret
< 0) {
1414 spin_lock_bh(&con
->writequeue_lock
);
1415 writequeue_entry_complete(e
, ret
);
1416 spin_unlock_bh(&con
->writequeue_lock
);
1418 return DLM_IO_SUCCESS
;
1421 static void clean_one_writequeue(struct connection
*con
)
1423 struct writequeue_entry
*e
, *safe
;
1425 spin_lock_bh(&con
->writequeue_lock
);
1426 list_for_each_entry_safe(e
, safe
, &con
->writequeue
, list
) {
1429 spin_unlock_bh(&con
->writequeue_lock
);
1432 static void connection_release(struct rcu_head
*rcu
)
1434 struct connection
*con
= container_of(rcu
, struct connection
, rcu
);
1436 WARN_ON_ONCE(!list_empty(&con
->writequeue
));
1437 WARN_ON_ONCE(con
->sock
);
1441 /* Called from recovery when it knows that a node has
1443 int dlm_lowcomms_close(int nodeid
)
1445 struct connection
*con
;
1448 log_print("closing connection to node %d", nodeid
);
1450 idx
= srcu_read_lock(&connections_srcu
);
1451 con
= nodeid2con(nodeid
, 0);
1452 if (WARN_ON_ONCE(!con
)) {
1453 srcu_read_unlock(&connections_srcu
, idx
);
1457 stop_connection_io(con
);
1458 log_print("io handling for node: %d stopped", nodeid
);
1459 close_connection(con
, true);
1461 spin_lock(&connections_lock
);
1462 hlist_del_rcu(&con
->list
);
1463 spin_unlock(&connections_lock
);
1465 clean_one_writequeue(con
);
1466 call_srcu(&connections_srcu
, &con
->rcu
, connection_release
);
1467 if (con
->othercon
) {
1468 clean_one_writequeue(con
->othercon
);
1469 call_srcu(&connections_srcu
, &con
->othercon
->rcu
, connection_release
);
1471 srcu_read_unlock(&connections_srcu
, idx
);
1473 /* for debugging we print when we are done to compare with other
1474 * messages in between. This function need to be correctly synchronized
1477 log_print("closing connection to node %d done", nodeid
);
1482 /* Receive worker function */
1483 static void process_recv_sockets(struct work_struct
*work
)
1485 struct connection
*con
= container_of(work
, struct connection
, rwork
);
1488 down_read(&con
->sock_lock
);
1490 up_read(&con
->sock_lock
);
1494 buflen
= READ_ONCE(dlm_config
.ci_buffer_size
);
1496 ret
= receive_from_sock(con
, buflen
);
1497 } while (ret
== DLM_IO_SUCCESS
);
1498 up_read(&con
->sock_lock
);
1502 /* CF_RECV_PENDING cleared */
1505 close_connection(con
, false);
1506 wake_up(&con
->shutdown_wait
);
1507 /* CF_RECV_PENDING cleared */
1510 /* we can't flush the process_workqueue here because a
1511 * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
1512 * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
1513 * we have a waitqueue to wait until all messages are
1516 * This handling is only necessary to backoff the sender and
1517 * not queue all messages from the socket layer into DLM
1518 * processqueue. When DLM is capable to parse multiple messages
1519 * on an e.g. per socket basis this handling can might be
1520 * removed. Especially in a message burst we are too slow to
1521 * process messages and the queue will fill up memory.
1523 wait_event(processqueue_wq
, !atomic_read(&processqueue_count
));
1525 case DLM_IO_RESCHED
:
1527 queue_work(io_workqueue
, &con
->rwork
);
1528 /* CF_RECV_PENDING not cleared */
1532 if (test_bit(CF_IS_OTHERCON
, &con
->flags
)) {
1533 close_connection(con
, false);
1535 spin_lock_bh(&con
->writequeue_lock
);
1536 lowcomms_queue_swork(con
);
1537 spin_unlock_bh(&con
->writequeue_lock
);
1540 /* CF_RECV_PENDING cleared for othercon
1541 * we trigger send queue if not already done
1542 * and process_send_sockets will handle it
1552 static void process_listen_recv_socket(struct work_struct
*work
)
1556 if (WARN_ON_ONCE(!listen_con
.sock
))
1560 ret
= accept_from_sock();
1561 } while (ret
== DLM_IO_SUCCESS
);
1564 log_print("critical error accepting connection: %d", ret
);
1567 static int dlm_connect(struct connection
*con
)
1569 struct sockaddr_storage addr
;
1570 int result
, addr_len
;
1571 struct socket
*sock
;
1574 memset(&addr
, 0, sizeof(addr
));
1575 result
= nodeid_to_addr(con
->nodeid
, &addr
, NULL
,
1576 dlm_proto_ops
->try_new_addr
, &mark
);
1578 log_print("no address for nodeid %d", con
->nodeid
);
1582 /* Create a socket to communicate with */
1583 result
= sock_create_kern(&init_net
, dlm_local_addr
[0].ss_family
,
1584 SOCK_STREAM
, dlm_proto_ops
->proto
, &sock
);
1588 sock_set_mark(sock
->sk
, mark
);
1589 dlm_proto_ops
->sockopts(sock
);
1591 result
= dlm_proto_ops
->bind(sock
);
1597 add_sock(sock
, con
);
1599 log_print_ratelimited("connecting to %d", con
->nodeid
);
1600 make_sockaddr(&addr
, dlm_config
.ci_tcp_port
, &addr_len
);
1601 result
= kernel_connect(sock
, (struct sockaddr
*)&addr
, addr_len
, 0);
1610 dlm_close_sock(&con
->sock
);
1618 /* Send worker function */
1619 static void process_send_sockets(struct work_struct
*work
)
1621 struct connection
*con
= container_of(work
, struct connection
, swork
);
1624 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON
, &con
->flags
));
1626 down_read(&con
->sock_lock
);
1628 up_read(&con
->sock_lock
);
1629 down_write(&con
->sock_lock
);
1631 ret
= dlm_connect(con
);
1636 /* CF_SEND_PENDING not cleared */
1637 up_write(&con
->sock_lock
);
1638 log_print("connect to node %d try %d error %d",
1639 con
->nodeid
, con
->retries
++, ret
);
1641 /* For now we try forever to reconnect. In
1642 * future we should send a event to cluster
1643 * manager to fence itself after certain amount
1646 queue_work(io_workqueue
, &con
->swork
);
1650 downgrade_write(&con
->sock_lock
);
1654 ret
= send_to_sock(con
);
1655 } while (ret
== DLM_IO_SUCCESS
);
1656 up_read(&con
->sock_lock
);
1660 /* CF_SEND_PENDING cleared */
1662 case DLM_IO_RESCHED
:
1663 /* CF_SEND_PENDING not cleared */
1665 queue_work(io_workqueue
, &con
->swork
);
1669 close_connection(con
, false);
1671 /* CF_SEND_PENDING cleared */
1672 spin_lock_bh(&con
->writequeue_lock
);
1673 lowcomms_queue_swork(con
);
1674 spin_unlock_bh(&con
->writequeue_lock
);
1683 static void work_stop(void)
1686 destroy_workqueue(io_workqueue
);
1687 io_workqueue
= NULL
;
1690 if (process_workqueue
) {
1691 destroy_workqueue(process_workqueue
);
1692 process_workqueue
= NULL
;
1696 static int work_start(void)
1698 io_workqueue
= alloc_workqueue("dlm_io", WQ_HIGHPRI
| WQ_MEM_RECLAIM
|
1700 if (!io_workqueue
) {
1701 log_print("can't start dlm_io");
1705 process_workqueue
= alloc_workqueue("dlm_process", WQ_HIGHPRI
| WQ_BH
, 0);
1706 if (!process_workqueue
) {
1707 log_print("can't start dlm_process");
1708 destroy_workqueue(io_workqueue
);
1709 io_workqueue
= NULL
;
1716 void dlm_lowcomms_shutdown(void)
1718 struct connection
*con
;
1721 /* stop lowcomms_listen_data_ready calls */
1722 lock_sock(listen_con
.sock
->sk
);
1723 listen_con
.sock
->sk
->sk_data_ready
= listen_sock
.sk_data_ready
;
1724 release_sock(listen_con
.sock
->sk
);
1726 cancel_work_sync(&listen_con
.rwork
);
1727 dlm_close_sock(&listen_con
.sock
);
1729 idx
= srcu_read_lock(&connections_srcu
);
1730 for (i
= 0; i
< CONN_HASH_SIZE
; i
++) {
1731 hlist_for_each_entry_rcu(con
, &connection_hash
[i
], list
) {
1732 shutdown_connection(con
, true);
1733 stop_connection_io(con
);
1734 flush_workqueue(process_workqueue
);
1735 close_connection(con
, true);
1737 clean_one_writequeue(con
);
1739 clean_one_writequeue(con
->othercon
);
1740 allow_connection_io(con
);
1743 srcu_read_unlock(&connections_srcu
, idx
);
1746 void dlm_lowcomms_stop(void)
1749 dlm_proto_ops
= NULL
;
1752 static int dlm_listen_for_all(void)
1754 struct socket
*sock
;
1757 log_print("Using %s for communications",
1758 dlm_proto_ops
->name
);
1760 result
= dlm_proto_ops
->listen_validate();
1764 result
= sock_create_kern(&init_net
, dlm_local_addr
[0].ss_family
,
1765 SOCK_STREAM
, dlm_proto_ops
->proto
, &sock
);
1767 log_print("Can't create comms socket: %d", result
);
1771 sock_set_mark(sock
->sk
, dlm_config
.ci_mark
);
1772 dlm_proto_ops
->listen_sockopts(sock
);
1774 result
= dlm_proto_ops
->listen_bind(sock
);
1778 lock_sock(sock
->sk
);
1779 listen_sock
.sk_data_ready
= sock
->sk
->sk_data_ready
;
1780 listen_sock
.sk_write_space
= sock
->sk
->sk_write_space
;
1781 listen_sock
.sk_error_report
= sock
->sk
->sk_error_report
;
1782 listen_sock
.sk_state_change
= sock
->sk
->sk_state_change
;
1784 listen_con
.sock
= sock
;
1786 sock
->sk
->sk_allocation
= GFP_NOFS
;
1787 sock
->sk
->sk_use_task_frag
= false;
1788 sock
->sk
->sk_data_ready
= lowcomms_listen_data_ready
;
1789 release_sock(sock
->sk
);
1791 result
= sock
->ops
->listen(sock
, 128);
1793 dlm_close_sock(&listen_con
.sock
);
1804 static int dlm_tcp_bind(struct socket
*sock
)
1806 struct sockaddr_storage src_addr
;
1807 int result
, addr_len
;
1809 /* Bind to our cluster-known address connecting to avoid
1812 memcpy(&src_addr
, &dlm_local_addr
[0], sizeof(src_addr
));
1813 make_sockaddr(&src_addr
, 0, &addr_len
);
1815 result
= kernel_bind(sock
, (struct sockaddr
*)&src_addr
,
1818 /* This *may* not indicate a critical error */
1819 log_print("could not bind for connect: %d", result
);
1825 static int dlm_tcp_listen_validate(void)
1827 /* We don't support multi-homed hosts */
1828 if (dlm_local_count
> 1) {
1829 log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
1836 static void dlm_tcp_sockopts(struct socket
*sock
)
1838 /* Turn off Nagle's algorithm */
1839 tcp_sock_set_nodelay(sock
->sk
);
1842 static void dlm_tcp_listen_sockopts(struct socket
*sock
)
1844 dlm_tcp_sockopts(sock
);
1845 sock_set_reuseaddr(sock
->sk
);
1848 static int dlm_tcp_listen_bind(struct socket
*sock
)
1852 /* Bind to our port */
1853 make_sockaddr(&dlm_local_addr
[0], dlm_config
.ci_tcp_port
, &addr_len
);
1854 return kernel_bind(sock
, (struct sockaddr
*)&dlm_local_addr
[0],
1858 static const struct dlm_proto_ops dlm_tcp_ops
= {
1860 .proto
= IPPROTO_TCP
,
1861 .sockopts
= dlm_tcp_sockopts
,
1862 .bind
= dlm_tcp_bind
,
1863 .listen_validate
= dlm_tcp_listen_validate
,
1864 .listen_sockopts
= dlm_tcp_listen_sockopts
,
1865 .listen_bind
= dlm_tcp_listen_bind
,
1868 static int dlm_sctp_bind(struct socket
*sock
)
1870 return sctp_bind_addrs(sock
, 0);
1873 static int dlm_sctp_listen_validate(void)
1875 if (!IS_ENABLED(CONFIG_IP_SCTP
)) {
1876 log_print("SCTP is not enabled by this kernel");
1880 request_module("sctp");
1884 static int dlm_sctp_bind_listen(struct socket
*sock
)
1886 return sctp_bind_addrs(sock
, dlm_config
.ci_tcp_port
);
1889 static void dlm_sctp_sockopts(struct socket
*sock
)
1891 /* Turn off Nagle's algorithm */
1892 sctp_sock_set_nodelay(sock
->sk
);
1893 sock_set_rcvbuf(sock
->sk
, NEEDED_RMEM
);
1896 static const struct dlm_proto_ops dlm_sctp_ops
= {
1898 .proto
= IPPROTO_SCTP
,
1899 .try_new_addr
= true,
1900 .sockopts
= dlm_sctp_sockopts
,
1901 .bind
= dlm_sctp_bind
,
1902 .listen_validate
= dlm_sctp_listen_validate
,
1903 .listen_sockopts
= dlm_sctp_sockopts
,
1904 .listen_bind
= dlm_sctp_bind_listen
,
1907 int dlm_lowcomms_start(void)
1912 if (!dlm_local_count
) {
1914 log_print("no local IP address has been set");
1918 error
= work_start();
1922 /* Start listening */
1923 switch (dlm_config
.ci_protocol
) {
1925 dlm_proto_ops
= &dlm_tcp_ops
;
1927 case DLM_PROTO_SCTP
:
1928 dlm_proto_ops
= &dlm_sctp_ops
;
1931 log_print("Invalid protocol identifier %d set",
1932 dlm_config
.ci_protocol
);
1934 goto fail_proto_ops
;
1937 error
= dlm_listen_for_all();
1944 dlm_proto_ops
= NULL
;
1951 void dlm_lowcomms_init(void)
1955 for (i
= 0; i
< CONN_HASH_SIZE
; i
++)
1956 INIT_HLIST_HEAD(&connection_hash
[i
]);
1958 INIT_WORK(&listen_con
.rwork
, process_listen_recv_socket
);
1961 void dlm_lowcomms_exit(void)
1963 struct connection
*con
;
1966 idx
= srcu_read_lock(&connections_srcu
);
1967 for (i
= 0; i
< CONN_HASH_SIZE
; i
++) {
1968 hlist_for_each_entry_rcu(con
, &connection_hash
[i
], list
) {
1969 spin_lock(&connections_lock
);
1970 hlist_del_rcu(&con
->list
);
1971 spin_unlock(&connections_lock
);
1974 call_srcu(&connections_srcu
, &con
->othercon
->rcu
,
1975 connection_release
);
1976 call_srcu(&connections_srcu
, &con
->rcu
, connection_release
);
1979 srcu_read_unlock(&connections_srcu
, idx
);