4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Ronnie Sahlberg 2008
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/network.h"
23 #include "system/filesys.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/time.h"
30 #include "lib/util/blocking.h"
32 #include "ctdb_private.h"
34 #include "common/system.h"
35 #include "common/common.h"
36 #include "common/logging.h"
37 #include "common/path.h"
39 #include "protocol/protocol_util.h"
44 stop any outgoing connection (established or pending) to a node
46 void ctdb_tcp_stop_outgoing(struct ctdb_node
*node
)
48 struct ctdb_tcp_node
*tnode
= talloc_get_type(
49 node
->transport_data
, struct ctdb_tcp_node
);
51 TALLOC_FREE(tnode
->out_queue
);
52 TALLOC_FREE(tnode
->connect_te
);
53 TALLOC_FREE(tnode
->connect_fde
);
54 if (tnode
->out_fd
!= -1) {
61 stop incoming connection to a node
63 void ctdb_tcp_stop_incoming(struct ctdb_node
*node
)
65 struct ctdb_tcp_node
*tnode
= talloc_get_type(
66 node
->transport_data
, struct ctdb_tcp_node
);
68 TALLOC_FREE(tnode
->in_queue
);
72 called when a complete packet has come in - should not happen on this socket
73 unless the other side closes the connection with RST or FIN
75 void ctdb_tcp_tnode_cb(uint8_t *data
, size_t cnt
, void *private_data
)
77 struct ctdb_node
*node
= talloc_get_type(private_data
, struct ctdb_node
);
79 node
->ctdb
->upcalls
->node_dead(node
);
85 called when socket becomes writeable on connect
87 static void ctdb_node_connect_write(struct tevent_context
*ev
,
88 struct tevent_fd
*fde
,
89 uint16_t flags
, void *private_data
)
91 struct ctdb_node
*node
= talloc_get_type(private_data
,
93 struct ctdb_tcp_node
*tnode
= talloc_get_type(node
->transport_data
,
94 struct ctdb_tcp_node
);
95 struct ctdb_context
*ctdb
= node
->ctdb
;
97 socklen_t len
= sizeof(error
);
101 TALLOC_FREE(tnode
->connect_te
);
103 ret
= getsockopt(tnode
->out_fd
, SOL_SOCKET
, SO_ERROR
, &error
, &len
);
104 if (ret
!= 0 || error
!= 0) {
105 ctdb_tcp_stop_outgoing(node
);
106 tnode
->connect_te
= tevent_add_timer(ctdb
->ev
, tnode
,
107 timeval_current_ofs(1, 0),
108 ctdb_tcp_node_connect
, node
);
112 TALLOC_FREE(tnode
->connect_fde
);
114 ret
= setsockopt(tnode
->out_fd
,
120 DBG_WARNING("Failed to set TCP_NODELAY on fd - %s\n",
123 ret
= setsockopt(tnode
->out_fd
,
125 SO_KEEPALIVE
,(char *)&one
,
128 DBG_WARNING("Failed to set KEEPALIVE on fd - %s\n",
132 tnode
->out_queue
= ctdb_queue_setup(node
->ctdb
,
140 if (tnode
->out_queue
== NULL
) {
141 DBG_ERR("Failed to set up outgoing queue\n");
142 ctdb_tcp_stop_outgoing(node
);
143 tnode
->connect_te
= tevent_add_timer(ctdb
->ev
,
145 timeval_current_ofs(1, 0),
146 ctdb_tcp_node_connect
,
151 /* the queue subsystem now owns this fd */
155 * Mark the node to which this connection has been established
156 * as connected, but only if the corresponding listening
157 * socket is also connected
159 if (tnode
->in_queue
!= NULL
) {
160 node
->ctdb
->upcalls
->node_connected(node
);
165 static void ctdb_tcp_node_connect_timeout(struct tevent_context
*ev
,
166 struct tevent_timer
*te
,
171 called when we should try and establish a tcp connection to a node
173 static void ctdb_tcp_start_outgoing(struct ctdb_node
*node
)
175 struct ctdb_tcp_node
*tnode
= talloc_get_type(node
->transport_data
,
176 struct ctdb_tcp_node
);
177 struct ctdb_context
*ctdb
= node
->ctdb
;
178 ctdb_sock_addr sock_in
;
181 ctdb_sock_addr sock_out
;
184 sock_out
= node
->address
;
186 tnode
->out_fd
= socket(sock_out
.sa
.sa_family
, SOCK_STREAM
, IPPROTO_TCP
);
187 if (tnode
->out_fd
== -1) {
188 DBG_ERR("Failed to create socket\n");
192 ret
= set_blocking(tnode
->out_fd
, false);
194 DBG_ERR("Failed to set socket non-blocking (%s)\n",
199 set_close_on_exec(tnode
->out_fd
);
201 DBG_DEBUG("Created TCP SOCKET FD:%d\n", tnode
->out_fd
);
203 /* Bind our side of the socketpair to the same address we use to listen
204 * on incoming CTDB traffic.
205 * We must specify this address to make sure that the address we expose to
206 * the remote side is actually routable in case CTDB traffic will run on
207 * a dedicated non-routeable network.
209 sock_in
= *ctdb
->address
;
211 /* AIX libs check to see if the socket address and length
212 arguments are consistent with each other on calls like
213 connect(). Can not get by with just sizeof(sock_in),
214 need sizeof(sock_in.ip).
216 switch (sock_in
.sa
.sa_family
) {
218 sock_in
.ip
.sin_port
= 0 /* Any port */;
219 sockin_size
= sizeof(sock_in
.ip
);
220 sockout_size
= sizeof(sock_out
.ip
);
223 sock_in
.ip6
.sin6_port
= 0 /* Any port */;
224 sockin_size
= sizeof(sock_in
.ip6
);
225 sockout_size
= sizeof(sock_out
.ip6
);
228 DBG_ERR("Unknown address family %u\n", sock_in
.sa
.sa_family
);
229 /* Can't happen to due to address parsing restrictions */
233 ret
= bind(tnode
->out_fd
, (struct sockaddr
*)&sock_in
, sockin_size
);
235 DBG_ERR("Failed to bind socket (%s)\n", strerror(errno
));
239 ret
= connect(tnode
->out_fd
,
240 (struct sockaddr
*)&sock_out
,
242 if (ret
!= 0 && errno
!= EINPROGRESS
) {
246 /* non-blocking connect - wait for write event */
247 tnode
->connect_fde
= tevent_add_fd(node
->ctdb
->ev
,
250 TEVENT_FD_WRITE
|TEVENT_FD_READ
,
251 ctdb_node_connect_write
,
254 /* don't give it long to connect - retry in one second. This ensures
255 that we find a node is up quickly (tcp normally backs off a syn reply
256 delay by quite a lot) */
257 tnode
->connect_te
= tevent_add_timer(ctdb
->ev
,
259 timeval_current_ofs(1, 0),
260 ctdb_tcp_node_connect_timeout
,
266 ctdb_tcp_stop_outgoing(node
);
267 tnode
->connect_te
= tevent_add_timer(ctdb
->ev
,
269 timeval_current_ofs(1, 0),
270 ctdb_tcp_node_connect
,
274 void ctdb_tcp_node_connect(struct tevent_context
*ev
,
275 struct tevent_timer
*te
,
279 struct ctdb_node
*node
= talloc_get_type_abort(private_data
,
282 ctdb_tcp_start_outgoing(node
);
285 static void ctdb_tcp_node_connect_timeout(struct tevent_context
*ev
,
286 struct tevent_timer
*te
,
290 struct ctdb_node
*node
= talloc_get_type_abort(private_data
,
293 ctdb_tcp_stop_outgoing(node
);
294 ctdb_tcp_start_outgoing(node
);
298 called when we get contacted by another node
299 currently makes no attempt to check if the connection is really from a ctdb
302 static void ctdb_listen_event(struct tevent_context
*ev
, struct tevent_fd
*fde
,
303 uint16_t flags
, void *private_data
)
305 struct ctdb_context
*ctdb
= talloc_get_type(private_data
, struct ctdb_context
);
306 struct ctdb_tcp
*ctcp
= talloc_get_type(ctdb
->transport_data
,
311 struct ctdb_node
*node
;
312 struct ctdb_tcp_node
*tnode
;
316 memset(&addr
, 0, sizeof(addr
));
318 fd
= accept(ctcp
->listen_fd
, (struct sockaddr
*)&addr
, &len
);
319 if (fd
== -1) return;
320 smb_set_close_on_exec(fd
);
322 node
= ctdb_ip_to_node(ctdb
, &addr
);
324 char *t
= ctdb_sock_addr_to_string(ctcp
, &addr
, true);
326 DBG_ERR("Refused connection from unparsable node\n");
330 D_ERR("Refused connection from unknown node %s\n", t
);
335 tnode
= talloc_get_type_abort(node
->transport_data
,
336 struct ctdb_tcp_node
);
338 /* This can't happen - see ctdb_tcp_initialise() */
339 DBG_ERR("INTERNAL ERROR setting up connection from node %s\n",
344 if (tnode
->in_queue
!= NULL
) {
345 DBG_ERR("Incoming queue active, rejecting connection from %s\n",
350 ret
= set_blocking(fd
, false);
352 DBG_ERR("Failed to set socket non-blocking (%s)\n",
357 set_close_on_exec(fd
);
359 DBG_DEBUG("Created SOCKET FD:%d to incoming ctdb connection\n", fd
);
367 DBG_WARNING("Failed to set KEEPALIVE on fd - %s\n",
371 tnode
->in_queue
= ctdb_queue_setup(ctdb
,
379 if (tnode
->in_queue
== NULL
) {
380 DBG_ERR("Failed to set up incoming queue\n");
385 * Mark the connecting node as connected, but only if the
386 * corresponding outbound connected is also up
388 if (tnode
->out_queue
!= NULL
) {
389 node
->ctdb
->upcalls
->node_connected(node
);
398 static int ctdb_tcp_listen_addr(struct ctdb_context
*ctdb
,
399 ctdb_sock_addr
*addr
,
402 struct ctdb_tcp
*ctcp
= talloc_get_type_abort(
403 ctdb
->transport_data
, struct ctdb_tcp
);
407 struct tevent_fd
*fde
= NULL
;
411 ctcp
->listen_fd
= -1;
413 switch (sock
.sa
.sa_family
) {
415 sock_size
= sizeof(sock
.ip
);
418 sock_size
= sizeof(sock
.ip6
);
421 DBG_ERR("Unknown family %u\n", sock
.sa
.sa_family
);
425 ctcp
->listen_fd
= socket(sock
.sa
.sa_family
, SOCK_STREAM
, IPPROTO_TCP
);
426 if (ctcp
->listen_fd
== -1) {
427 DBG_ERR("Socket failed - %s (%d)\n", strerror(errno
), errno
);
431 set_close_on_exec(ctcp
->listen_fd
);
433 ret
= setsockopt(ctcp
->listen_fd
,
439 DBG_WARNING("Failed to set REUSEADDR on fd - %s (%d)\n",
444 ret
=bind(ctcp
->listen_fd
, (struct sockaddr
* )&sock
, sock_size
);
446 if (strict
|| errno
!= EADDRNOTAVAIL
) {
447 DBG_ERR("Failed to bind() to socket - %s (%d)\n",
451 DBG_DEBUG("Failed to bind() to socket - %s (%d)\n",
458 ret
= listen(ctcp
->listen_fd
, 10);
460 DBG_ERR("Failed to listen() on socket - %s (%d)\n",
466 fde
= tevent_add_fd(ctdb
->ev
,
472 tevent_fd_set_auto_close(fde
);
477 if (ctcp
->listen_fd
!= -1) {
478 close(ctcp
->listen_fd
);
479 ctcp
->listen_fd
= -1;
485 automatically find which address to listen on
487 static int ctdb_tcp_listen_automatic(struct ctdb_context
*ctdb
)
491 char *lock_path
= NULL
;
496 * If there are no nodes, then it won't be possible to find
497 * the first one. Log a failure and short circuit the whole
500 if (ctdb
->num_nodes
== 0) {
501 DEBUG(DEBUG_CRIT
,("No nodes available to attempt bind to - is the nodes file empty?\n"));
506 * In order to ensure that we don't get two nodes with the
507 * same address, we must make the bind() and listen() calls
508 * atomic. The SO_REUSEADDR setsockopt only prevents double
509 * binds if the first socket is in LISTEN state.
511 lock_path
= path_rundir_append(ctdb
, ".socket_lock");
512 if (lock_path
== NULL
) {
513 DBG_ERR("Memory allocation error\n");
516 lock_fd
= open(lock_path
, O_RDWR
|O_CREAT
, 0666);
518 DBG_ERR("Unable to open %s\n", lock_path
);
519 talloc_free(lock_path
);
523 lock
.l_type
= F_WRLCK
;
524 lock
.l_whence
= SEEK_SET
;
529 if (fcntl(lock_fd
, F_SETLKW
, &lock
) != 0) {
530 DBG_ERR("Unable to lock %s\n", lock_path
);
532 talloc_free(lock_path
);
535 talloc_free(lock_path
);
537 for (i
=0; i
< ctdb
->num_nodes
; i
++) {
538 if (ctdb
->nodes
[i
]->flags
& NODE_FLAGS_DELETED
) {
542 ret
= ctdb_tcp_listen_addr(ctdb
,
543 &ctdb
->nodes
[i
]->address
,
550 if (i
== ctdb
->num_nodes
) {
551 D_ERR("Unable to bind to any node address - giving up\n");
555 ctdb
->address
= talloc_memdup(ctdb
,
556 &ctdb
->nodes
[i
]->address
,
557 sizeof(ctdb_sock_addr
));
558 if (ctdb
->address
== NULL
) {
559 DBG_ERR("Memory allocation error\n");
563 ctdb
->name
= talloc_strdup(ctdb
, ctdb
->nodes
[i
]->name
);
564 if (ctdb
->name
== NULL
) {
565 DBG_ERR("Memory allocation error\n");
569 D_INFO("ctdb chose network address %s\n", ctdb
->name
);
575 listen on our own address
577 int ctdb_tcp_listen(struct ctdb_context
*ctdb
)
581 /* we can either auto-bind to the first available address, or we can
582 use a specified address */
583 if (!ctdb
->address
) {
584 ret
= ctdb_tcp_listen_automatic(ctdb
);
588 ret
= ctdb_tcp_listen_addr(ctdb
, ctdb
->address
, true);