2 ctdb main protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/samba_util.h"
31 #include "ctdb_private.h"
32 #include "ctdb_client.h"
34 #include "protocol/protocol.h"
36 #include "common/common.h"
37 #include "common/logging.h"
39 #include "conf/node.h"
42 choose the transport we will use
44 int ctdb_set_transport(struct ctdb_context
*ctdb
, const char *transport
)
46 ctdb
->transport
= talloc_strdup(ctdb
, transport
);
47 if (ctdb
->transport
== NULL
) {
48 DBG_ERR("Memory allocation error\n");
55 /* Return the node structure for nodeip, NULL if nodeip is invalid */
56 struct ctdb_node
*ctdb_ip_to_node(struct ctdb_context
*ctdb
,
57 const ctdb_sock_addr
*nodeip
)
61 for (nodeid
=0;nodeid
<ctdb
->num_nodes
;nodeid
++) {
62 if (ctdb
->nodes
[nodeid
]->flags
& NODE_FLAGS_DELETED
) {
65 if (ctdb_same_ip(&ctdb
->nodes
[nodeid
]->address
, nodeip
)) {
66 return ctdb
->nodes
[nodeid
];
73 /* Return the PNN for nodeip, CTDB_UNKNOWN_PNN if nodeip is invalid */
74 uint32_t ctdb_ip_to_pnn(struct ctdb_context
*ctdb
,
75 const ctdb_sock_addr
*nodeip
)
77 struct ctdb_node
*node
;
79 node
= ctdb_ip_to_node(ctdb
, nodeip
);
81 return CTDB_UNKNOWN_PNN
;
87 /* Load a nodes list file into a nodes array */
88 static int convert_node_map_to_list(struct ctdb_context
*ctdb
,
90 struct ctdb_node_map
*node_map
,
91 struct ctdb_node
***nodes
,
96 *nodes
= talloc_zero_array(mem_ctx
,
97 struct ctdb_node
*, node_map
->num
);
99 DBG_ERR("Memory allocation error\n");
102 *num_nodes
= node_map
->num
;
104 for (i
= 0; i
< node_map
->num
; i
++) {
105 struct ctdb_node
*node
;
107 node
= talloc_zero(*nodes
, struct ctdb_node
);
109 DBG_ERR("Memory allocation error\n");
115 node
->address
= node_map
->node
[i
].addr
;
116 node
->name
= talloc_asprintf(node
, "%s:%u",
117 ctdb_addr_to_str(&node
->address
),
118 ctdb_addr_to_port(&node
->address
));
119 if (node
->name
== NULL
) {
120 DBG_ERR("Memory allocation error\n");
125 node
->flags
= node_map
->node
[i
].flags
;
126 if (!(node
->flags
& NODE_FLAGS_DELETED
)) {
127 node
->flags
= NODE_FLAGS_UNHEALTHY
;
129 node
->flags
|= NODE_FLAGS_DISCONNECTED
;
133 node
->dead_count
= 0;
139 /* Load the nodes list from a file or sub-processes' stdout */
140 void ctdb_load_nodes(struct ctdb_context
*ctdb
)
142 struct ctdb_node_map
*node_map
;
145 node_map
= ctdb_read_nodes(ctdb
, ctdb
->nodes_source
);
146 if (node_map
== NULL
) {
150 TALLOC_FREE(ctdb
->nodes
);
151 ret
= convert_node_map_to_list(ctdb
, ctdb
, node_map
,
152 &ctdb
->nodes
, &ctdb
->num_nodes
);
157 talloc_free(node_map
);
161 DEBUG(DEBUG_ERR
, ("Failed to load nodes \"%s\"\n",
162 ctdb
->nodes_source
));
163 talloc_free(node_map
);
168 setup the local node address
170 int ctdb_set_address(struct ctdb_context
*ctdb
, const char *address
)
174 ctdb
->address
= talloc(ctdb
, ctdb_sock_addr
);
175 if (ctdb
->address
== NULL
) {
176 DBG_ERR("Memory allocation error\n");
180 ok
= ctdb_parse_node_address(address
, ctdb
->address
);
182 DBG_ERR("Failed to parse node address\n");
183 TALLOC_FREE(ctdb
->address
);
187 ctdb
->name
= talloc_asprintf(ctdb
, "%s:%u",
188 ctdb_addr_to_str(ctdb
->address
),
189 ctdb_addr_to_port(ctdb
->address
));
190 if (ctdb
->name
== NULL
) {
191 DBG_ERR("Memory allocation error\n");
192 TALLOC_FREE(ctdb
->address
);
201 return the number of active nodes
203 uint32_t ctdb_get_num_active_nodes(struct ctdb_context
*ctdb
)
207 for (i
=0; i
< ctdb
->num_nodes
; i
++) {
208 if (!(ctdb
->nodes
[i
]->flags
& NODE_FLAGS_INACTIVE
)) {
217 called when we need to process a packet. This can be a requeued packet
218 after a lockwait, or a real packet from another node
220 void ctdb_input_pkt(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
224 /* place the packet as a child of the tmp_ctx. We then use
225 talloc_free() below to free it. If any of the calls want
226 to keep it, then they will steal it somewhere else, and the
227 talloc_free() will only free the tmp_ctx */
228 tmp_ctx
= talloc_new(ctdb
);
229 talloc_steal(tmp_ctx
, hdr
);
231 DEBUG(DEBUG_DEBUG
,(__location__
" ctdb request %u of type %u length %u from "
232 "node %u to %u\n", hdr
->reqid
, hdr
->operation
, hdr
->length
,
233 hdr
->srcnode
, hdr
->destnode
));
235 switch (hdr
->operation
) {
237 case CTDB_REPLY_CALL
:
238 case CTDB_REQ_DMASTER
:
239 case CTDB_REPLY_DMASTER
:
240 /* we don't allow these calls when banned */
241 if (ctdb
->nodes
[ctdb
->pnn
]->flags
& NODE_FLAGS_BANNED
) {
242 DEBUG(DEBUG_DEBUG
,(__location__
" ctdb operation %u"
244 " length %u from node %u to %u while node"
246 hdr
->operation
, hdr
->reqid
,
248 hdr
->srcnode
, hdr
->destnode
));
252 /* for ctdb_call inter-node operations verify that the
253 remote node that sent us the call is running in the
254 same generation instance as this node
256 if (ctdb
->vnn_map
->generation
!= hdr
->generation
) {
257 DEBUG(DEBUG_DEBUG
,(__location__
" ctdb operation %u"
259 " length %u from node %u to %u had an"
260 " invalid generation id:%u while our"
261 " generation id is:%u\n",
262 hdr
->operation
, hdr
->reqid
,
264 hdr
->srcnode
, hdr
->destnode
,
265 hdr
->generation
, ctdb
->vnn_map
->generation
));
270 switch (hdr
->operation
) {
272 CTDB_INCREMENT_STAT(ctdb
, node
.req_call
);
273 ctdb_request_call(ctdb
, hdr
);
276 case CTDB_REPLY_CALL
:
277 CTDB_INCREMENT_STAT(ctdb
, node
.reply_call
);
278 ctdb_reply_call(ctdb
, hdr
);
281 case CTDB_REPLY_ERROR
:
282 CTDB_INCREMENT_STAT(ctdb
, node
.reply_error
);
283 ctdb_reply_error(ctdb
, hdr
);
286 case CTDB_REQ_DMASTER
:
287 CTDB_INCREMENT_STAT(ctdb
, node
.req_dmaster
);
288 ctdb_request_dmaster(ctdb
, hdr
);
291 case CTDB_REPLY_DMASTER
:
292 CTDB_INCREMENT_STAT(ctdb
, node
.reply_dmaster
);
293 ctdb_reply_dmaster(ctdb
, hdr
);
296 case CTDB_REQ_MESSAGE
:
297 CTDB_INCREMENT_STAT(ctdb
, node
.req_message
);
298 ctdb_request_message(ctdb
, hdr
);
301 case CTDB_REQ_CONTROL
:
302 CTDB_INCREMENT_STAT(ctdb
, node
.req_control
);
303 ctdb_request_control(ctdb
, hdr
);
306 case CTDB_REPLY_CONTROL
:
307 CTDB_INCREMENT_STAT(ctdb
, node
.reply_control
);
308 ctdb_reply_control(ctdb
, hdr
);
311 case CTDB_REQ_KEEPALIVE
:
312 CTDB_INCREMENT_STAT(ctdb
, keepalive_packets_recv
);
313 ctdb_request_keepalive(ctdb
, hdr
);
316 case CTDB_REQ_TUNNEL
:
317 CTDB_INCREMENT_STAT(ctdb
, node
.req_tunnel
);
318 ctdb_request_tunnel(ctdb
, hdr
);
322 DEBUG(DEBUG_CRIT
,("%s: Packet with unknown operation %u\n",
323 __location__
, hdr
->operation
));
328 talloc_free(tmp_ctx
);
333 called by the transport layer when a node is dead
335 void ctdb_node_dead(struct ctdb_node
*node
)
337 if (node
->ctdb
->methods
== NULL
) {
338 DBG_ERR("Can not restart transport while shutting down\n");
341 node
->ctdb
->methods
->restart(node
);
343 if (node
->flags
& NODE_FLAGS_DISCONNECTED
) {
344 DEBUG(DEBUG_INFO
,("%s: node %s is already marked disconnected: %u connected\n",
345 node
->ctdb
->name
, node
->name
,
346 node
->ctdb
->num_connected
));
349 node
->ctdb
->num_connected
--;
350 node
->flags
|= NODE_FLAGS_DISCONNECTED
| NODE_FLAGS_UNHEALTHY
;
352 node
->dead_count
= 0;
354 DEBUG(DEBUG_ERR
,("%s: node %s is dead: %u connected\n",
355 node
->ctdb
->name
, node
->name
, node
->ctdb
->num_connected
));
356 ctdb_daemon_cancel_controls(node
->ctdb
, node
);
360 called by the transport layer when a node is connected
362 void ctdb_node_connected(struct ctdb_node
*node
)
364 if (!(node
->flags
& NODE_FLAGS_DISCONNECTED
)) {
365 DEBUG(DEBUG_INFO
,("%s: node %s is already marked connected: %u connected\n",
366 node
->ctdb
->name
, node
->name
,
367 node
->ctdb
->num_connected
));
370 node
->ctdb
->num_connected
++;
371 node
->dead_count
= 0;
372 node
->flags
&= ~NODE_FLAGS_DISCONNECTED
;
374 ("%s: connected to %s - %u connected\n",
375 node
->ctdb
->name
, node
->name
, node
->ctdb
->num_connected
));
379 struct ctdb_context
*ctdb
;
380 struct ctdb_req_header
*hdr
;
385 triggered when a deferred packet is due
387 static void queue_next_trigger(struct tevent_context
*ev
,
388 struct tevent_timer
*te
,
389 struct timeval t
, void *private_data
)
391 struct queue_next
*q
= talloc_get_type(private_data
, struct queue_next
);
392 ctdb_input_pkt(q
->ctdb
, q
->hdr
);
397 defer a packet, so it is processed on the next event loop
398 this is used for sending packets to ourselves
400 static void ctdb_defer_packet(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
402 struct queue_next
*q
;
403 q
= talloc(ctdb
, struct queue_next
);
405 DEBUG(DEBUG_ERR
,(__location__
" Failed to allocate deferred packet\n"));
409 q
->hdr
= talloc_memdup(q
, hdr
, hdr
->length
);
410 if (q
->hdr
== NULL
) {
412 DEBUG(DEBUG_ERR
,("Error copying deferred packet to self\n"));
416 /* use this to put packets directly into our recv function */
417 ctdb_input_pkt(q
->ctdb
, q
->hdr
);
419 tevent_add_timer(ctdb
->ev
, q
, timeval_zero(), queue_next_trigger
, q
);
425 broadcast a packet to all nodes
427 static void ctdb_broadcast_packet_all(struct ctdb_context
*ctdb
,
428 struct ctdb_req_header
*hdr
)
431 for (i
=0; i
< ctdb
->num_nodes
; i
++) {
432 if (ctdb
->nodes
[i
]->flags
& NODE_FLAGS_DELETED
) {
435 hdr
->destnode
= ctdb
->nodes
[i
]->pnn
;
436 ctdb_queue_packet(ctdb
, hdr
);
441 broadcast a packet to all active nodes
443 static void ctdb_broadcast_packet_active(struct ctdb_context
*ctdb
,
444 struct ctdb_req_header
*hdr
)
447 for (i
= 0; i
< ctdb
->num_nodes
; i
++) {
448 if (ctdb
->nodes
[i
]->flags
& NODE_FLAGS_INACTIVE
) {
452 hdr
->destnode
= ctdb
->nodes
[i
]->pnn
;
453 ctdb_queue_packet(ctdb
, hdr
);
458 broadcast a packet to all connected nodes
460 static void ctdb_broadcast_packet_connected(struct ctdb_context
*ctdb
,
461 struct ctdb_req_header
*hdr
)
464 for (i
=0; i
< ctdb
->num_nodes
; i
++) {
465 if (ctdb
->nodes
[i
]->flags
& NODE_FLAGS_DELETED
) {
468 if (!(ctdb
->nodes
[i
]->flags
& NODE_FLAGS_DISCONNECTED
)) {
469 hdr
->destnode
= ctdb
->nodes
[i
]->pnn
;
470 ctdb_queue_packet(ctdb
, hdr
);
476 queue a packet or die
478 void ctdb_queue_packet(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
480 struct ctdb_node
*node
;
482 switch (hdr
->destnode
) {
483 case CTDB_BROADCAST_ALL
:
484 ctdb_broadcast_packet_all(ctdb
, hdr
);
486 case CTDB_BROADCAST_ACTIVE
:
487 ctdb_broadcast_packet_active(ctdb
, hdr
);
489 case CTDB_BROADCAST_CONNECTED
:
490 ctdb_broadcast_packet_connected(ctdb
, hdr
);
494 CTDB_INCREMENT_STAT(ctdb
, node_packets_sent
);
496 if (!ctdb_validate_pnn(ctdb
, hdr
->destnode
)) {
497 DEBUG(DEBUG_CRIT
,(__location__
" can't send to node %u that does not exist\n",
502 node
= ctdb
->nodes
[hdr
->destnode
];
504 if (node
->flags
& NODE_FLAGS_DELETED
) {
505 DEBUG(DEBUG_ERR
, (__location__
" Can not queue packet to DELETED node %d\n", hdr
->destnode
));
509 if (node
->pnn
== ctdb
->pnn
) {
510 ctdb_defer_packet(ctdb
, hdr
);
514 if (ctdb
->methods
== NULL
) {
515 DEBUG(DEBUG_ALERT
, (__location__
" Can not queue packet. "
516 "Transport is DOWN\n"));
521 if (ctdb
->methods
->queue_pkt(node
, (uint8_t *)hdr
, hdr
->length
) != 0) {
522 ctdb_fatal(ctdb
, "Unable to queue packet\n");
530 a valgrind hack to allow us to get opcode specific backtraces
531 very ugly, and relies on no compiler optimisation!
533 void ctdb_queue_packet_opcode(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
, unsigned opcode
)
536 #define DO_OP(x) case x: ctdb_queue_packet(ctdb, hdr); break
638 ctdb_queue_packet(ctdb
, hdr
);