1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
17 * This is the "low-level" comms layer.
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never (well, hardly ever) waits.
45 #include <asm/ioctls.h>
48 #include <net/sctp/user.h>
49 #include <linux/pagemap.h>
50 #include <linux/socket.h>
51 #include <linux/idr.h>
53 #include "dlm_internal.h"
58 static struct sockaddr_storage
*dlm_local_addr
[DLM_MAX_ADDR_COUNT
];
59 static int dlm_local_count
;
60 static int dlm_local_nodeid
;
62 /* One of these per connected node */
64 #define NI_INIT_PENDING 1
65 #define NI_WRITE_PENDING 2
69 sctp_assoc_t assoc_id
;
71 struct list_head write_list
; /* nodes with pending writes */
72 struct list_head writequeue
; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock
;
77 static DEFINE_IDR(nodeinfo_idr
);
78 static struct rw_semaphore nodeinfo_lock
;
79 static int max_nodeid
;
87 /* Just the one of these, now. But this struct keeps
88 the connection-specific variables together */
90 #define CF_READ_PENDING 1
96 atomic_t waiting_requests
;
101 /* An entry waiting to be sent */
103 struct writequeue_entry
{
104 struct list_head list
;
113 #define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
114 #define CBUF_EMPTY(cb) ((cb)->len == 0)
115 #define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
116 #define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
118 #define CBUF_INIT(cb, size) \
120 (cb)->base = (cb)->len = 0; \
121 (cb)->mask = ((size)-1); \
124 #define CBUF_EAT(cb, n) \
128 (cb)->base &= (cb)->mask; \
132 /* List of nodes which have writes pending */
133 static struct list_head write_nodes
;
134 static spinlock_t write_nodes_lock
;
136 /* Maximum number of incoming messages to process before
139 #define MAX_RX_MSG_COUNT 25
142 static struct task_struct
*recv_task
;
143 static struct task_struct
*send_task
;
144 static wait_queue_head_t lowcomms_recv_wait
;
145 static atomic_t accepting
;
147 /* The SCTP connection */
148 static struct connection sctp_con
;
151 static int nodeid_to_addr(int nodeid
, struct sockaddr
*retaddr
)
153 struct sockaddr_storage addr
;
156 if (!dlm_local_count
)
159 error
= dlm_nodeid_to_addr(nodeid
, &addr
);
163 if (dlm_local_addr
[0]->ss_family
== AF_INET
) {
164 struct sockaddr_in
*in4
= (struct sockaddr_in
*) &addr
;
165 struct sockaddr_in
*ret4
= (struct sockaddr_in
*) retaddr
;
166 ret4
->sin_addr
.s_addr
= in4
->sin_addr
.s_addr
;
168 struct sockaddr_in6
*in6
= (struct sockaddr_in6
*) &addr
;
169 struct sockaddr_in6
*ret6
= (struct sockaddr_in6
*) retaddr
;
170 memcpy(&ret6
->sin6_addr
, &in6
->sin6_addr
,
171 sizeof(in6
->sin6_addr
));
177 static struct nodeinfo
*nodeid2nodeinfo(int nodeid
, gfp_t alloc
)
183 down_read(&nodeinfo_lock
);
184 ni
= idr_find(&nodeinfo_idr
, nodeid
);
185 up_read(&nodeinfo_lock
);
188 down_write(&nodeinfo_lock
);
190 ni
= idr_find(&nodeinfo_idr
, nodeid
);
194 r
= idr_pre_get(&nodeinfo_idr
, alloc
);
198 ni
= kmalloc(sizeof(struct nodeinfo
), alloc
);
202 r
= idr_get_new_above(&nodeinfo_idr
, ni
, nodeid
, &n
);
209 idr_remove(&nodeinfo_idr
, n
);
214 memset(ni
, 0, sizeof(struct nodeinfo
));
215 spin_lock_init(&ni
->lock
);
216 INIT_LIST_HEAD(&ni
->writequeue
);
217 spin_lock_init(&ni
->writequeue_lock
);
220 if (nodeid
> max_nodeid
)
223 up_write(&nodeinfo_lock
);
229 /* Don't call this too often... */
230 static struct nodeinfo
*assoc2nodeinfo(sctp_assoc_t assoc
)
235 for (i
=1; i
<=max_nodeid
; i
++) {
236 ni
= nodeid2nodeinfo(i
, 0);
237 if (ni
&& ni
->assoc_id
== assoc
)
243 /* Data or notification available on socket */
244 static void lowcomms_data_ready(struct sock
*sk
, int count_unused
)
246 atomic_inc(&sctp_con
.waiting_requests
);
247 if (test_and_set_bit(CF_READ_PENDING
, &sctp_con
.flags
))
250 wake_up_interruptible(&lowcomms_recv_wait
);
254 /* Add the port number to an IP6 or 4 sockaddr and return the address length.
255 Also padd out the struct with zeros to make comparisons meaningful */
257 static void make_sockaddr(struct sockaddr_storage
*saddr
, uint16_t port
,
260 struct sockaddr_in
*local4_addr
;
261 struct sockaddr_in6
*local6_addr
;
263 if (!dlm_local_count
)
267 if (dlm_local_addr
[0]->ss_family
== AF_INET
) {
268 local4_addr
= (struct sockaddr_in
*)dlm_local_addr
[0];
269 port
= be16_to_cpu(local4_addr
->sin_port
);
271 local6_addr
= (struct sockaddr_in6
*)dlm_local_addr
[0];
272 port
= be16_to_cpu(local6_addr
->sin6_port
);
276 saddr
->ss_family
= dlm_local_addr
[0]->ss_family
;
277 if (dlm_local_addr
[0]->ss_family
== AF_INET
) {
278 struct sockaddr_in
*in4_addr
= (struct sockaddr_in
*)saddr
;
279 in4_addr
->sin_port
= cpu_to_be16(port
);
280 memset(&in4_addr
->sin_zero
, 0, sizeof(in4_addr
->sin_zero
));
281 memset(in4_addr
+1, 0, sizeof(struct sockaddr_storage
) -
282 sizeof(struct sockaddr_in
));
283 *addr_len
= sizeof(struct sockaddr_in
);
285 struct sockaddr_in6
*in6_addr
= (struct sockaddr_in6
*)saddr
;
286 in6_addr
->sin6_port
= cpu_to_be16(port
);
287 memset(in6_addr
+1, 0, sizeof(struct sockaddr_storage
) -
288 sizeof(struct sockaddr_in6
));
289 *addr_len
= sizeof(struct sockaddr_in6
);
293 /* Close the connection and tidy up */
294 static void close_connection(void)
297 sock_release(sctp_con
.sock
);
298 sctp_con
.sock
= NULL
;
301 if (sctp_con
.rx_page
) {
302 __free_page(sctp_con
.rx_page
);
303 sctp_con
.rx_page
= NULL
;
307 /* We only send shutdown messages to nodes that are not part of the cluster */
308 static void send_shutdown(sctp_assoc_t associd
)
310 static char outcmsg
[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo
))];
311 struct msghdr outmessage
;
312 struct cmsghdr
*cmsg
;
313 struct sctp_sndrcvinfo
*sinfo
;
316 outmessage
.msg_name
= NULL
;
317 outmessage
.msg_namelen
= 0;
318 outmessage
.msg_control
= outcmsg
;
319 outmessage
.msg_controllen
= sizeof(outcmsg
);
320 outmessage
.msg_flags
= MSG_EOR
;
322 cmsg
= CMSG_FIRSTHDR(&outmessage
);
323 cmsg
->cmsg_level
= IPPROTO_SCTP
;
324 cmsg
->cmsg_type
= SCTP_SNDRCV
;
325 cmsg
->cmsg_len
= CMSG_LEN(sizeof(struct sctp_sndrcvinfo
));
326 outmessage
.msg_controllen
= cmsg
->cmsg_len
;
327 sinfo
= (struct sctp_sndrcvinfo
*)CMSG_DATA(cmsg
);
328 memset(sinfo
, 0x00, sizeof(struct sctp_sndrcvinfo
));
330 sinfo
->sinfo_flags
|= MSG_EOF
;
331 sinfo
->sinfo_assoc_id
= associd
;
333 ret
= kernel_sendmsg(sctp_con
.sock
, &outmessage
, NULL
, 0, 0);
336 log_print("send EOF to node failed: %d", ret
);
340 /* INIT failed but we don't know which node...
341 restart INIT on all pending nodes */
342 static void init_failed(void)
347 for (i
=1; i
<=max_nodeid
; i
++) {
348 ni
= nodeid2nodeinfo(i
, 0);
352 if (test_and_clear_bit(NI_INIT_PENDING
, &ni
->flags
)) {
354 if (!test_and_set_bit(NI_WRITE_PENDING
, &ni
->flags
)) {
355 spin_lock_bh(&write_nodes_lock
);
356 list_add_tail(&ni
->write_list
, &write_nodes
);
357 spin_unlock_bh(&write_nodes_lock
);
361 wake_up_process(send_task
);
364 /* Something happened to an association */
365 static void process_sctp_notification(struct msghdr
*msg
, char *buf
)
367 union sctp_notification
*sn
= (union sctp_notification
*)buf
;
369 if (sn
->sn_header
.sn_type
== SCTP_ASSOC_CHANGE
) {
370 switch (sn
->sn_assoc_change
.sac_state
) {
375 /* Check that the new node is in the lockspace */
376 struct sctp_prim prim
;
383 /* This seems to happen when we received a connection
384 * too early... or something... anyway, it happens but
385 * we always seem to get a real message too, see
386 * receive_from_sock */
388 if ((int)sn
->sn_assoc_change
.sac_assoc_id
<= 0) {
389 log_print("COMM_UP for invalid assoc ID %d",
390 (int)sn
->sn_assoc_change
.sac_assoc_id
);
394 memset(&prim
, 0, sizeof(struct sctp_prim
));
395 prim_len
= sizeof(struct sctp_prim
);
396 prim
.ssp_assoc_id
= sn
->sn_assoc_change
.sac_assoc_id
;
400 ret
= sctp_con
.sock
->ops
->getsockopt(sctp_con
.sock
,
401 IPPROTO_SCTP
, SCTP_PRIMARY_ADDR
,
402 (char*)&prim
, &prim_len
);
407 log_print("getsockopt/sctp_primary_addr on "
408 "new assoc %d failed : %d",
409 (int)sn
->sn_assoc_change
.sac_assoc_id
, ret
);
411 /* Retry INIT later */
412 ni
= assoc2nodeinfo(sn
->sn_assoc_change
.sac_assoc_id
);
414 clear_bit(NI_INIT_PENDING
, &ni
->flags
);
417 make_sockaddr(&prim
.ssp_addr
, 0, &addr_len
);
418 if (dlm_addr_to_nodeid(&prim
.ssp_addr
, &nodeid
)) {
419 log_print("reject connect from unknown addr");
420 send_shutdown(prim
.ssp_assoc_id
);
424 ni
= nodeid2nodeinfo(nodeid
, GFP_KERNEL
);
428 /* Save the assoc ID */
429 spin_lock(&ni
->lock
);
430 ni
->assoc_id
= sn
->sn_assoc_change
.sac_assoc_id
;
431 spin_unlock(&ni
->lock
);
433 log_print("got new/restarted association %d nodeid %d",
434 (int)sn
->sn_assoc_change
.sac_assoc_id
, nodeid
);
436 /* Send any pending writes */
437 clear_bit(NI_INIT_PENDING
, &ni
->flags
);
438 if (!test_and_set_bit(NI_WRITE_PENDING
, &ni
->flags
)) {
439 spin_lock_bh(&write_nodes_lock
);
440 list_add_tail(&ni
->write_list
, &write_nodes
);
441 spin_unlock_bh(&write_nodes_lock
);
443 wake_up_process(send_task
);
448 case SCTP_SHUTDOWN_COMP
:
452 ni
= assoc2nodeinfo(sn
->sn_assoc_change
.sac_assoc_id
);
454 spin_lock(&ni
->lock
);
456 spin_unlock(&ni
->lock
);
461 /* We don't know which INIT failed, so clear the PENDING flags
462 * on them all. if assoc_id is zero then it will then try
465 case SCTP_CANT_STR_ASSOC
:
467 log_print("Can't start SCTP association - retrying");
473 log_print("unexpected SCTP assoc change id=%d state=%d",
474 (int)sn
->sn_assoc_change
.sac_assoc_id
,
475 sn
->sn_assoc_change
.sac_state
);
480 /* Data received from remote end */
481 static int receive_from_sock(void)
488 struct sctp_sndrcvinfo
*sinfo
;
489 struct cmsghdr
*cmsg
;
492 /* These two are marginally too big for stack allocation, but this
493 * function is (currently) only called by dlm_recvd so static should be
496 static struct sockaddr_storage msgname
;
497 static char incmsg
[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo
))];
499 if (sctp_con
.sock
== NULL
)
502 if (sctp_con
.rx_page
== NULL
) {
504 * This doesn't need to be atomic, but I think it should
505 * improve performance if it is.
507 sctp_con
.rx_page
= alloc_page(GFP_ATOMIC
);
508 if (sctp_con
.rx_page
== NULL
)
510 CBUF_INIT(&sctp_con
.cb
, PAGE_CACHE_SIZE
);
513 memset(&incmsg
, 0, sizeof(incmsg
));
514 memset(&msgname
, 0, sizeof(msgname
));
516 memset(incmsg
, 0, sizeof(incmsg
));
517 msg
.msg_name
= &msgname
;
518 msg
.msg_namelen
= sizeof(msgname
);
520 msg
.msg_control
= incmsg
;
521 msg
.msg_controllen
= sizeof(incmsg
);
524 /* I don't see why this circular buffer stuff is necessary for SCTP
525 * which is a packet-based protocol, but the whole thing breaks under
526 * load without it! The overhead is minimal (and is in the TCP lowcomms
527 * anyway, of course) so I'll leave it in until I can figure out what's
532 * iov[0] is the bit of the circular buffer between the current end
533 * point (cb.base + cb.len) and the end of the buffer.
535 iov
[0].iov_len
= sctp_con
.cb
.base
- CBUF_DATA(&sctp_con
.cb
);
536 iov
[0].iov_base
= page_address(sctp_con
.rx_page
) +
537 CBUF_DATA(&sctp_con
.cb
);
541 * iov[1] is the bit of the circular buffer between the start of the
542 * buffer and the start of the currently used section (cb.base)
544 if (CBUF_DATA(&sctp_con
.cb
) >= sctp_con
.cb
.base
) {
545 iov
[0].iov_len
= PAGE_CACHE_SIZE
- CBUF_DATA(&sctp_con
.cb
);
546 iov
[1].iov_len
= sctp_con
.cb
.base
;
547 iov
[1].iov_base
= page_address(sctp_con
.rx_page
);
550 len
= iov
[0].iov_len
+ iov
[1].iov_len
;
552 r
= ret
= kernel_recvmsg(sctp_con
.sock
, &msg
, iov
, msg
.msg_iovlen
, len
,
553 MSG_NOSIGNAL
| MSG_DONTWAIT
);
557 msg
.msg_control
= incmsg
;
558 msg
.msg_controllen
= sizeof(incmsg
);
559 cmsg
= CMSG_FIRSTHDR(&msg
);
560 sinfo
= (struct sctp_sndrcvinfo
*)CMSG_DATA(cmsg
);
562 if (msg
.msg_flags
& MSG_NOTIFICATION
) {
563 process_sctp_notification(&msg
, page_address(sctp_con
.rx_page
));
567 /* Is this a new association ? */
568 ni
= nodeid2nodeinfo(le32_to_cpu(sinfo
->sinfo_ppid
), GFP_KERNEL
);
570 ni
->assoc_id
= sinfo
->sinfo_assoc_id
;
571 if (test_and_clear_bit(NI_INIT_PENDING
, &ni
->flags
)) {
573 if (!test_and_set_bit(NI_WRITE_PENDING
, &ni
->flags
)) {
574 spin_lock_bh(&write_nodes_lock
);
575 list_add_tail(&ni
->write_list
, &write_nodes
);
576 spin_unlock_bh(&write_nodes_lock
);
578 wake_up_process(send_task
);
582 /* INIT sends a message with length of 1 - ignore it */
586 CBUF_ADD(&sctp_con
.cb
, ret
);
587 ret
= dlm_process_incoming_buffer(cpu_to_le32(sinfo
->sinfo_ppid
),
588 page_address(sctp_con
.rx_page
),
589 sctp_con
.cb
.base
, sctp_con
.cb
.len
,
593 CBUF_EAT(&sctp_con
.cb
, ret
);
600 lowcomms_data_ready(sctp_con
.sock
->sk
, 0);
607 log_print("error reading from sctp socket: %d", ret
);
612 /* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
613 static int add_bind_addr(struct sockaddr_storage
*addr
, int addr_len
, int num
)
621 result
= sctp_con
.sock
->ops
->bind(sctp_con
.sock
,
622 (struct sockaddr
*) addr
, addr_len
);
624 result
= sctp_con
.sock
->ops
->setsockopt(sctp_con
.sock
, SOL_SCTP
,
625 SCTP_SOCKOPT_BINDX_ADD
, (char *)addr
, addr_len
);
629 log_print("Can't bind to port %d addr number %d",
630 dlm_config
.tcp_port
, num
);
635 static void init_local(void)
637 struct sockaddr_storage sas
, *addr
;
640 dlm_local_nodeid
= dlm_our_nodeid();
642 for (i
= 0; i
< DLM_MAX_ADDR_COUNT
- 1; i
++) {
643 if (dlm_our_addr(&sas
, i
))
646 addr
= kmalloc(sizeof(*addr
), GFP_KERNEL
);
649 memcpy(addr
, &sas
, sizeof(*addr
));
650 dlm_local_addr
[dlm_local_count
++] = addr
;
654 /* Initialise SCTP socket and bind to all interfaces */
655 static int init_sock(void)
658 struct socket
*sock
= NULL
;
659 struct sockaddr_storage localaddr
;
660 struct sctp_event_subscribe subscribe
;
661 int result
= -EINVAL
, num
= 1, i
, addr_len
;
663 if (!dlm_local_count
) {
665 if (!dlm_local_count
) {
666 log_print("no local IP address has been set");
671 result
= sock_create_kern(dlm_local_addr
[0]->ss_family
, SOCK_SEQPACKET
,
672 IPPROTO_SCTP
, &sock
);
674 log_print("Can't create comms socket, check SCTP is loaded");
678 /* Listen for events */
679 memset(&subscribe
, 0, sizeof(subscribe
));
680 subscribe
.sctp_data_io_event
= 1;
681 subscribe
.sctp_association_event
= 1;
682 subscribe
.sctp_send_failure_event
= 1;
683 subscribe
.sctp_shutdown_event
= 1;
684 subscribe
.sctp_partial_delivery_event
= 1;
688 result
= sock
->ops
->setsockopt(sock
, SOL_SCTP
, SCTP_EVENTS
,
689 (char *)&subscribe
, sizeof(subscribe
));
693 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
698 /* Init con struct */
699 sock
->sk
->sk_user_data
= &sctp_con
;
700 sctp_con
.sock
= sock
;
701 sctp_con
.sock
->sk
->sk_data_ready
= lowcomms_data_ready
;
703 /* Bind to all interfaces. */
704 for (i
= 0; i
< dlm_local_count
; i
++) {
705 memcpy(&localaddr
, dlm_local_addr
[i
], sizeof(localaddr
));
706 make_sockaddr(&localaddr
, dlm_config
.tcp_port
, &addr_len
);
708 result
= add_bind_addr(&localaddr
, addr_len
, num
);
714 result
= sock
->ops
->listen(sock
, 5);
716 log_print("Can't set socket listening");
724 sctp_con
.sock
= NULL
;
730 static struct writequeue_entry
*new_writequeue_entry(gfp_t allocation
)
732 struct writequeue_entry
*entry
;
734 entry
= kmalloc(sizeof(struct writequeue_entry
), allocation
);
738 entry
->page
= alloc_page(allocation
);
752 void *dlm_lowcomms_get_buffer(int nodeid
, int len
, gfp_t allocation
, char **ppc
)
754 struct writequeue_entry
*e
;
759 if (!atomic_read(&accepting
))
762 ni
= nodeid2nodeinfo(nodeid
, allocation
);
766 spin_lock(&ni
->writequeue_lock
);
767 e
= list_entry(ni
->writequeue
.prev
, struct writequeue_entry
, list
);
768 if (((struct list_head
*) e
== &ni
->writequeue
) ||
769 (PAGE_CACHE_SIZE
- e
->end
< len
)) {
776 spin_unlock(&ni
->writequeue_lock
);
782 *ppc
= page_address(e
->page
) + offset
;
786 e
= new_writequeue_entry(allocation
);
788 spin_lock(&ni
->writequeue_lock
);
793 list_add_tail(&e
->list
, &ni
->writequeue
);
794 spin_unlock(&ni
->writequeue_lock
);
800 void dlm_lowcomms_commit_buffer(void *arg
)
802 struct writequeue_entry
*e
= (struct writequeue_entry
*) arg
;
804 struct nodeinfo
*ni
= e
->ni
;
806 if (!atomic_read(&accepting
))
809 spin_lock(&ni
->writequeue_lock
);
813 e
->len
= e
->end
- e
->offset
;
815 spin_unlock(&ni
->writequeue_lock
);
817 if (!test_and_set_bit(NI_WRITE_PENDING
, &ni
->flags
)) {
818 spin_lock_bh(&write_nodes_lock
);
819 list_add_tail(&ni
->write_list
, &write_nodes
);
820 spin_unlock_bh(&write_nodes_lock
);
821 wake_up_process(send_task
);
826 spin_unlock(&ni
->writequeue_lock
);
830 static void free_entry(struct writequeue_entry
*e
)
832 __free_page(e
->page
);
836 /* Initiate an SCTP association. In theory we could just use sendmsg() on
837 the first IP address and it should work, but this allows us to set up the
838 association before sending any valuable data that we can't afford to lose.
839 It also keeps the send path clean as it can now always use the association ID */
840 static void initiate_association(int nodeid
)
842 struct sockaddr_storage rem_addr
;
843 static char outcmsg
[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo
))];
844 struct msghdr outmessage
;
845 struct cmsghdr
*cmsg
;
846 struct sctp_sndrcvinfo
*sinfo
;
853 log_print("Initiating association with node %d", nodeid
);
855 ni
= nodeid2nodeinfo(nodeid
, GFP_KERNEL
);
859 if (nodeid_to_addr(nodeid
, (struct sockaddr
*)&rem_addr
)) {
860 log_print("no address for nodeid %d", nodeid
);
864 make_sockaddr(&rem_addr
, dlm_config
.tcp_port
, &addrlen
);
866 outmessage
.msg_name
= &rem_addr
;
867 outmessage
.msg_namelen
= addrlen
;
868 outmessage
.msg_control
= outcmsg
;
869 outmessage
.msg_controllen
= sizeof(outcmsg
);
870 outmessage
.msg_flags
= MSG_EOR
;
872 iov
[0].iov_base
= buf
;
875 /* Real INIT messages seem to cause trouble. Just send a 1 byte message
876 we can afford to lose */
877 cmsg
= CMSG_FIRSTHDR(&outmessage
);
878 cmsg
->cmsg_level
= IPPROTO_SCTP
;
879 cmsg
->cmsg_type
= SCTP_SNDRCV
;
880 cmsg
->cmsg_len
= CMSG_LEN(sizeof(struct sctp_sndrcvinfo
));
881 sinfo
= (struct sctp_sndrcvinfo
*)CMSG_DATA(cmsg
);
882 memset(sinfo
, 0x00, sizeof(struct sctp_sndrcvinfo
));
883 sinfo
->sinfo_ppid
= cpu_to_le32(dlm_local_nodeid
);
885 outmessage
.msg_controllen
= cmsg
->cmsg_len
;
886 ret
= kernel_sendmsg(sctp_con
.sock
, &outmessage
, iov
, 1, 1);
888 log_print("send INIT to node failed: %d", ret
);
889 /* Try again later */
890 clear_bit(NI_INIT_PENDING
, &ni
->flags
);
895 static int send_to_sock(struct nodeinfo
*ni
)
898 struct writequeue_entry
*e
;
900 struct msghdr outmsg
;
901 static char outcmsg
[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo
))];
902 struct cmsghdr
*cmsg
;
903 struct sctp_sndrcvinfo
*sinfo
;
906 /* See if we need to init an association before we start
907 sending precious messages */
908 spin_lock(&ni
->lock
);
909 if (!ni
->assoc_id
&& !test_and_set_bit(NI_INIT_PENDING
, &ni
->flags
)) {
910 spin_unlock(&ni
->lock
);
911 initiate_association(ni
->nodeid
);
914 spin_unlock(&ni
->lock
);
916 outmsg
.msg_name
= NULL
; /* We use assoc_id */
917 outmsg
.msg_namelen
= 0;
918 outmsg
.msg_control
= outcmsg
;
919 outmsg
.msg_controllen
= sizeof(outcmsg
);
920 outmsg
.msg_flags
= MSG_DONTWAIT
| MSG_NOSIGNAL
| MSG_EOR
;
922 cmsg
= CMSG_FIRSTHDR(&outmsg
);
923 cmsg
->cmsg_level
= IPPROTO_SCTP
;
924 cmsg
->cmsg_type
= SCTP_SNDRCV
;
925 cmsg
->cmsg_len
= CMSG_LEN(sizeof(struct sctp_sndrcvinfo
));
926 sinfo
= (struct sctp_sndrcvinfo
*)CMSG_DATA(cmsg
);
927 memset(sinfo
, 0x00, sizeof(struct sctp_sndrcvinfo
));
928 sinfo
->sinfo_ppid
= cpu_to_le32(dlm_local_nodeid
);
929 sinfo
->sinfo_assoc_id
= ni
->assoc_id
;
930 outmsg
.msg_controllen
= cmsg
->cmsg_len
;
932 spin_lock(&ni
->writequeue_lock
);
934 if (list_empty(&ni
->writequeue
))
936 e
= list_entry(ni
->writequeue
.next
, struct writequeue_entry
,
940 BUG_ON(len
== 0 && e
->users
== 0);
941 spin_unlock(&ni
->writequeue_lock
);
946 iov
.iov_base
= page_address(e
->page
)+offset
;
949 ret
= kernel_sendmsg(sctp_con
.sock
, &outmsg
, &iov
, 1,
951 if (ret
== -EAGAIN
) {
952 sctp_con
.eagain_flag
= 1;
957 /* Don't starve people filling buffers */
961 spin_lock(&ni
->writequeue_lock
);
965 if (e
->len
== 0 && e
->users
== 0) {
971 spin_unlock(&ni
->writequeue_lock
);
976 log_print("Error sending to node %d %d", ni
->nodeid
, ret
);
977 spin_lock(&ni
->lock
);
978 if (!test_and_set_bit(NI_INIT_PENDING
, &ni
->flags
)) {
980 spin_unlock(&ni
->lock
);
981 initiate_association(ni
->nodeid
);
983 spin_unlock(&ni
->lock
);
988 /* Try to send any messages that are pending */
989 static void process_output_queue(void)
991 struct list_head
*list
;
992 struct list_head
*temp
;
994 spin_lock_bh(&write_nodes_lock
);
995 list_for_each_safe(list
, temp
, &write_nodes
) {
996 struct nodeinfo
*ni
=
997 list_entry(list
, struct nodeinfo
, write_list
);
998 clear_bit(NI_WRITE_PENDING
, &ni
->flags
);
999 list_del(&ni
->write_list
);
1001 spin_unlock_bh(&write_nodes_lock
);
1004 spin_lock_bh(&write_nodes_lock
);
1006 spin_unlock_bh(&write_nodes_lock
);
1009 /* Called after we've had -EAGAIN and been woken up */
1010 static void refill_write_queue(void)
1014 for (i
=1; i
<=max_nodeid
; i
++) {
1015 struct nodeinfo
*ni
= nodeid2nodeinfo(i
, 0);
1018 if (!test_and_set_bit(NI_WRITE_PENDING
, &ni
->flags
)) {
1019 spin_lock_bh(&write_nodes_lock
);
1020 list_add_tail(&ni
->write_list
, &write_nodes
);
1021 spin_unlock_bh(&write_nodes_lock
);
1027 static void clean_one_writequeue(struct nodeinfo
*ni
)
1029 struct list_head
*list
;
1030 struct list_head
*temp
;
1032 spin_lock(&ni
->writequeue_lock
);
1033 list_for_each_safe(list
, temp
, &ni
->writequeue
) {
1034 struct writequeue_entry
*e
=
1035 list_entry(list
, struct writequeue_entry
, list
);
1039 spin_unlock(&ni
->writequeue_lock
);
1042 static void clean_writequeues(void)
1046 for (i
=1; i
<=max_nodeid
; i
++) {
1047 struct nodeinfo
*ni
= nodeid2nodeinfo(i
, 0);
1049 clean_one_writequeue(ni
);
1054 static void dealloc_nodeinfo(void)
1058 for (i
=1; i
<=max_nodeid
; i
++) {
1059 struct nodeinfo
*ni
= nodeid2nodeinfo(i
, 0);
1061 idr_remove(&nodeinfo_idr
, i
);
1067 int dlm_lowcomms_close(int nodeid
)
1069 struct nodeinfo
*ni
;
1071 ni
= nodeid2nodeinfo(nodeid
, 0);
1075 spin_lock(&ni
->lock
);
1078 /* Don't send shutdown here, sctp will just queue it
1079 till the node comes back up! */
1081 spin_unlock(&ni
->lock
);
1083 clean_one_writequeue(ni
);
1084 clear_bit(NI_INIT_PENDING
, &ni
->flags
);
1088 static int write_list_empty(void)
1092 spin_lock_bh(&write_nodes_lock
);
1093 status
= list_empty(&write_nodes
);
1094 spin_unlock_bh(&write_nodes_lock
);
1099 static int dlm_recvd(void *data
)
1101 DECLARE_WAITQUEUE(wait
, current
);
1103 while (!kthread_should_stop()) {
1106 set_current_state(TASK_INTERRUPTIBLE
);
1107 add_wait_queue(&lowcomms_recv_wait
, &wait
);
1108 if (!test_bit(CF_READ_PENDING
, &sctp_con
.flags
))
1110 remove_wait_queue(&lowcomms_recv_wait
, &wait
);
1111 set_current_state(TASK_RUNNING
);
1113 if (test_and_clear_bit(CF_READ_PENDING
, &sctp_con
.flags
)) {
1117 ret
= receive_from_sock();
1119 /* Don't starve out everyone else */
1120 if (++count
>= MAX_RX_MSG_COUNT
) {
1124 } while (!kthread_should_stop() && ret
>=0);
1132 static int dlm_sendd(void *data
)
1134 DECLARE_WAITQUEUE(wait
, current
);
1136 add_wait_queue(sctp_con
.sock
->sk
->sk_sleep
, &wait
);
1138 while (!kthread_should_stop()) {
1139 set_current_state(TASK_INTERRUPTIBLE
);
1140 if (write_list_empty())
1142 set_current_state(TASK_RUNNING
);
1144 if (sctp_con
.eagain_flag
) {
1145 sctp_con
.eagain_flag
= 0;
1146 refill_write_queue();
1148 process_output_queue();
1151 remove_wait_queue(sctp_con
.sock
->sk
->sk_sleep
, &wait
);
1156 static void daemons_stop(void)
1158 kthread_stop(recv_task
);
1159 kthread_stop(send_task
);
1162 static int daemons_start(void)
1164 struct task_struct
*p
;
1167 p
= kthread_run(dlm_recvd
, NULL
, "dlm_recvd");
1170 log_print("can't start dlm_recvd %d", error
);
1175 p
= kthread_run(dlm_sendd
, NULL
, "dlm_sendd");
1178 log_print("can't start dlm_sendd %d", error
);
1179 kthread_stop(recv_task
);
1188 * This is quite likely to sleep...
1190 int dlm_lowcomms_start(void)
1194 error
= init_sock();
1197 error
= daemons_start();
1200 atomic_set(&accepting
, 1);
1208 /* Set all the activity flags to prevent any socket activity. */
1210 void dlm_lowcomms_stop(void)
1212 atomic_set(&accepting
, 0);
1213 sctp_con
.flags
= 0x7;
1215 clean_writequeues();
1221 int dlm_lowcomms_init(void)
1223 init_waitqueue_head(&lowcomms_recv_wait
);
1224 spin_lock_init(&write_nodes_lock
);
1225 INIT_LIST_HEAD(&write_nodes
);
1226 init_rwsem(&nodeinfo_lock
);
1230 void dlm_lowcomms_exit(void)
1234 for (i
= 0; i
< dlm_local_count
; i
++)
1235 kfree(dlm_local_addr
[i
]);
1236 dlm_local_count
= 0;
1237 dlm_local_nodeid
= 0;