Update NEWS for 1.6.22
[pkg-k5-afs_openafs.git] / src / vol / daemon_com.c
blobacb7360af18fe9f6ca57812484164305cfb0c4f6
1 /*
2 * Copyright 2006-2008, Sine Nomine Associates and others.
3 * All Rights Reserved.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
11 * localhost interprocess communication for servers
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
21 #include <afsconfig.h>
22 #include <afs/param.h>
25 #include <sys/types.h>
26 #include <stdio.h>
27 #ifdef AFS_NT40_ENV
28 #include <winsock2.h>
29 #include <time.h>
30 #else
31 #include <sys/param.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netdb.h>
35 #include <sys/time.h>
36 #include <unistd.h>
37 #endif
38 #include <errno.h>
39 #include <afs/afs_assert.h>
40 #include <signal.h>
41 #include <string.h>
44 #include <rx/xdr.h>
45 #include <afs/afsint.h>
46 #include "nfs.h"
47 #include <afs/errors.h>
48 #include "daemon_com.h"
49 #include "lwp.h"
50 #include "lock.h"
51 #include <afs/afssyscalls.h>
52 #include "ihandle.h"
53 #include "vnode.h"
54 #include "volume.h"
55 #include "partition.h"
56 #include "common.h"
57 #include <rx/rx_queue.h>
59 #ifdef USE_UNIX_SOCKETS
60 #include <afs/afsutil.h>
61 #include <sys/un.h>
62 #endif
64 int (*V_BreakVolumeCallbacks) (VolumeId);
66 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
67 * move = dump+restore can run on single server */
69 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
71 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
75 * On AIX, connect() and bind() require use of SUN_LEN() macro;
76 * sizeof(struct sockaddr_un) will not suffice.
78 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
79 #define AFS_SOCKADDR_LEN(sa) SUN_LEN(sa)
80 #else
81 #define AFS_SOCKADDR_LEN(sa) sizeof(*sa)
82 #endif
85 /* daemon com SYNC general interfaces */
87 /**
88 * fill in sockaddr structure.
90 * @param[in] endpoint pointer to sync endpoint object
91 * @param[out] addr pointer to sockaddr structure
93 * @post sockaddr structure populated using information from
94 * endpoint structure.
96 void
97 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
99 #ifdef USE_UNIX_SOCKETS
100 char tbuffer[AFSDIR_PATH_MAX];
101 #endif /* USE_UNIX_SOCKETS */
103 memset(addr, 0, sizeof(*addr));
105 #ifdef USE_UNIX_SOCKETS
106 strcompose(tbuffer, AFSDIR_PATH_MAX, AFSDIR_SERVER_LOCAL_DIRPATH, "/",
107 endpoint->un, NULL);
108 addr->sun_family = AF_UNIX;
109 strncpy(addr->sun_path, tbuffer, (sizeof(struct sockaddr_un) - sizeof(short)));
110 #else /* !USE_UNIX_SOCKETS */
111 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
112 addr->sin_len = sizeof(struct sockaddr_in);
113 #endif
114 addr->sin_addr.s_addr = htonl(0x7f000001);
115 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
116 addr->sin_port = htons(endpoint->in); /* XXXX htons not _really_ neccessary */
117 #endif /* !USE_UNIX_SOCKETS */
121 * get a socket descriptor of the appropriate domain.
123 * @param[in] endpoint pointer to sync endpoint object
125 * @return socket descriptor
127 * @post socket of domain specified in endpoint structure is created and
128 * returned to caller.
130 osi_socket
131 SYNC_getSock(SYNC_endpoint_t * endpoint)
133 osi_socket sd;
134 osi_Assert((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
135 return sd;
138 /* daemon com SYNC client interface */
141 * open a client connection to a sync server
143 * @param[in] state pointer to sync client handle
145 * @return operation status
146 * @retval 1 success
148 * @note at present, this routine aborts rather than returning an error code
151 SYNC_connect(SYNC_client_state * state)
153 SYNC_sockaddr_t addr;
154 /* I can't believe the following is needed for localhost connections!! */
155 static time_t backoff[] =
156 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
157 time_t *timeout = &backoff[0];
159 if (state->fd != OSI_NULLSOCKET) {
160 return 1;
163 SYNC_getAddr(&state->endpoint, &addr);
165 for (;;) {
166 state->fd = SYNC_getSock(&state->endpoint);
167 if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
168 return 1;
169 if (!*timeout)
170 break;
171 if (!(*timeout & 1))
172 Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
173 state->proto_name);
174 SYNC_disconnect(state);
175 sleep(*timeout++);
177 perror("SYNC_connect failed (giving up!)");
178 return 0;
182 * forcibly disconnect a sync client handle.
184 * @param[in] state pointer to sync client handle
186 * @retval operation status
187 * @retval 0 success
190 SYNC_disconnect(SYNC_client_state * state)
192 #ifdef AFS_NT40_ENV
193 closesocket(state->fd);
194 #else
195 close(state->fd);
196 #endif
197 state->fd = OSI_NULLSOCKET;
198 return 0;
202 * gracefully disconnect a sync client handle.
204 * @param[in] state pointer to sync client handle
206 * @return operation status
207 * @retval SYNC_OK success
209 afs_int32
210 SYNC_closeChannel(SYNC_client_state * state)
212 SYNC_command com;
213 SYNC_response res;
214 SYNC_PROTO_BUF_DECL(ores);
216 if (state->fd == OSI_NULLSOCKET)
217 return SYNC_OK;
219 memset(&com, 0, sizeof(com));
220 memset(&res, 0, sizeof(res));
222 res.payload.len = SYNC_PROTO_MAX_LEN;
223 res.payload.buf = ores;
225 com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
226 com.hdr.command_len = sizeof(SYNC_command_hdr);
227 com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
229 /* in case the other end dropped, don't do any retries */
230 state->retry_limit = 0;
231 state->hard_timeout = 0;
233 SYNC_ask(state, &com, &res);
234 SYNC_disconnect(state);
236 return SYNC_OK;
240 * forcibly break a client connection, and then create a new connection.
242 * @param[in] state pointer to sync client handle
244 * @post old connection dropped; new connection established
246 * @return @see SYNC_connect()
249 SYNC_reconnect(SYNC_client_state * state)
251 SYNC_disconnect(state);
252 return SYNC_connect(state);
256 * send a command to a sync server and wait for a response.
258 * @param[in] state pointer to sync client handle
259 * @param[in] com command object
260 * @param[out] res response object
262 * @return operation status
263 * @retval SYNC_OK success
264 * @retval SYNC_COM_ERROR communications error
265 * @retval SYNC_BAD_COMMAND server did not recognize command code
267 * @note this routine merely handles error processing; SYNC_ask_internal()
268 * handles the low-level details of communicating with the SYNC server.
270 * @see SYNC_ask_internal
272 afs_int32
273 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
275 int tries;
276 afs_uint32 now, timeout, code=SYNC_OK;
278 if (state->fd == OSI_NULLSOCKET) {
279 SYNC_connect(state);
282 if (state->fd == OSI_NULLSOCKET) {
283 return SYNC_COM_ERROR;
286 #ifdef AFS_DEMAND_ATTACH_FS
287 com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
288 #endif
290 now = FT_ApproxTime();
291 timeout = now + state->hard_timeout;
292 for (tries = 0;
293 (tries <= state->retry_limit) && (now <= timeout);
294 tries++, now = FT_ApproxTime()) {
295 code = SYNC_ask_internal(state, com, res);
296 if (code == SYNC_OK) {
297 break;
298 } else if (code == SYNC_BAD_COMMAND) {
299 Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
300 "fileserver, volserver, salvageserver and salvager are same "
301 "version\n", state->proto_name);
302 break;
303 } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
304 Log("SYNC_ask: protocol communications failure on circuit '%s'; "
305 "attempting reconnect to server\n", state->proto_name);
306 SYNC_reconnect(state);
307 /* try again */
308 } else {
310 * unknown (probably protocol-specific) response code, pass it up to
311 * the caller, and let them deal with it
313 break;
317 if (code == SYNC_COM_ERROR) {
318 Log("SYNC_ask: too many / too latent fatal protocol errors on circuit "
319 "'%s'; giving up (tries %d timeout %d)\n",
320 state->proto_name, tries, timeout);
323 return code;
327 * send a command to a sync server and wait for a response.
329 * @param[in] state pointer to sync client handle
330 * @param[in] com command object
331 * @param[out] res response object
333 * @return operation status
334 * @retval SYNC_OK success
335 * @retval SYNC_COM_ERROR communications error
337 * @internal
339 static afs_int32
340 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
342 int n;
343 SYNC_PROTO_BUF_DECL(buf);
344 #ifndef AFS_NT40_ENV
345 int iovcnt;
346 struct iovec iov[2];
347 #endif
349 if (state->fd == OSI_NULLSOCKET) {
350 Log("SYNC_ask: invalid sync file descriptor on circuit '%s'\n",
351 state->proto_name);
352 res->hdr.response = SYNC_COM_ERROR;
353 goto done;
356 if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
357 Log("SYNC_ask: internal SYNC buffer too small on circuit '%s'; "
358 "please file a bug\n", state->proto_name);
359 res->hdr.response = SYNC_COM_ERROR;
360 goto done;
364 * fill in some common header fields
366 com->hdr.proto_version = state->proto_version;
367 com->hdr.pkt_seq = ++state->pkt_seq;
368 com->hdr.com_seq = ++state->com_seq;
369 #ifdef AFS_NT40_ENV
370 com->hdr.pid = 0;
371 com->hdr.tid = 0;
372 #else
373 com->hdr.pid = getpid();
374 #ifdef AFS_PTHREAD_ENV
375 com->hdr.tid = afs_pointer_to_int(pthread_self());
376 #else
378 PROCESS handle = LWP_ThreadId();
379 com->hdr.tid = (handle) ? handle->index : 0;
381 #endif /* !AFS_PTHREAD_ENV */
382 #endif /* !AFS_NT40_ENV */
384 memcpy(buf, &com->hdr, sizeof(com->hdr));
385 if (com->payload.len) {
386 memcpy(buf + sizeof(com->hdr), com->payload.buf,
387 com->hdr.command_len - sizeof(com->hdr));
390 #ifdef AFS_NT40_ENV
391 n = send(state->fd, buf, com->hdr.command_len, 0);
392 if (n != com->hdr.command_len) {
393 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
394 res->hdr.response = SYNC_COM_ERROR;
395 goto done;
398 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
399 /* short circuit close channel requests */
400 res->hdr.response = SYNC_OK;
401 goto done;
404 n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
405 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
406 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
407 res->hdr.response = SYNC_COM_ERROR;
408 goto done;
410 #else /* !AFS_NT40_ENV */
411 n = write(state->fd, buf, com->hdr.command_len);
412 if (com->hdr.command_len != n) {
413 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
414 res->hdr.response = SYNC_COM_ERROR;
415 goto done;
418 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
419 /* short circuit close channel requests */
420 res->hdr.response = SYNC_OK;
421 goto done;
424 /* receive the response */
425 iov[0].iov_base = (char *)&res->hdr;
426 iov[0].iov_len = sizeof(res->hdr);
427 if (res->payload.len) {
428 iov[1].iov_base = (char *)res->payload.buf;
429 iov[1].iov_len = res->payload.len;
430 iovcnt = 2;
431 } else {
432 iovcnt = 1;
434 n = readv(state->fd, iov, iovcnt);
435 if (n == 0 || (n < 0 && errno != EINTR)) {
436 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
437 res->hdr.response = SYNC_COM_ERROR;
438 goto done;
440 #endif /* !AFS_NT40_ENV */
442 res->recv_len = n;
444 if (n < sizeof(res->hdr)) {
445 Log("SYNC_ask: response too short on circuit '%s'\n",
446 state->proto_name);
447 res->hdr.response = SYNC_COM_ERROR;
448 goto done;
450 #ifdef AFS_NT40_ENV
451 memcpy(&res->hdr, buf, sizeof(res->hdr));
452 #endif
454 if ((n - sizeof(res->hdr)) > res->payload.len) {
455 Log("SYNC_ask: response too long on circuit '%s'\n",
456 state->proto_name);
457 res->hdr.response = SYNC_COM_ERROR;
458 goto done;
460 #ifdef AFS_NT40_ENV
461 memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
462 #endif
464 if (res->hdr.response_len != n) {
465 Log("SYNC_ask: length field in response inconsistent "
466 "on circuit '%s'\n", state->proto_name);
467 res->hdr.response = SYNC_COM_ERROR;
468 goto done;
470 if (res->hdr.response == SYNC_DENIED) {
471 Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
474 done:
475 return res->hdr.response;
480 * daemon com SYNC server-side interfaces
484 * receive a command structure off a sync socket.
486 * @param[in] state pointer to server-side state object
487 * @param[in] fd file descriptor on which to perform i/o
488 * @param[out] com sync command object to be populated
490 * @return operation status
491 * @retval SYNC_OK command received
492 * @retval SYNC_COM_ERROR there was a socket communications error
494 afs_int32
495 SYNC_getCom(SYNC_server_state_t * state,
496 osi_socket fd,
497 SYNC_command * com)
499 int n;
500 afs_int32 code = SYNC_OK;
501 #ifdef AFS_NT40_ENV
502 SYNC_PROTO_BUF_DECL(buf);
503 #else
504 struct iovec iov[2];
505 int iovcnt;
506 #endif
508 #ifdef AFS_NT40_ENV
509 n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
511 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
512 Log("SYNC_getCom: error receiving command\n");
513 code = SYNC_COM_ERROR;
514 goto done;
516 #else /* !AFS_NT40_ENV */
517 iov[0].iov_base = (char *)&com->hdr;
518 iov[0].iov_len = sizeof(com->hdr);
519 if (com->payload.len) {
520 iov[1].iov_base = (char *)com->payload.buf;
521 iov[1].iov_len = com->payload.len;
522 iovcnt = 2;
523 } else {
524 iovcnt = 1;
527 n = readv(fd, iov, iovcnt);
528 if (n == 0 || (n < 0 && errno != EINTR)) {
529 Log("SYNC_getCom: error receiving command\n");
530 code = SYNC_COM_ERROR;
531 goto done;
533 #endif /* !AFS_NT40_ENV */
535 com->recv_len = n;
537 if (n < sizeof(com->hdr)) {
538 Log("SYNC_getCom: command too short\n");
539 code = SYNC_COM_ERROR;
540 goto done;
542 #ifdef AFS_NT40_ENV
543 memcpy(&com->hdr, buf, sizeof(com->hdr));
544 #endif
546 if ((n - sizeof(com->hdr)) > com->payload.len) {
547 Log("SYNC_getCom: command too long\n");
548 code = SYNC_COM_ERROR;
549 goto done;
551 #ifdef AFS_NT40_ENV
552 memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
553 #endif
555 done:
556 return code;
560 * write a response structure to a sync socket.
562 * @param[in] state handle to server-side state object
563 * @param[in] fd file descriptor on which to perform i/o
564 * @param[in] res handle to response packet
566 * @return operation status
567 * @retval SYNC_OK
568 * @retval SYNC_COM_ERROR
570 afs_int32
571 SYNC_putRes(SYNC_server_state_t * state,
572 osi_socket fd,
573 SYNC_response * res)
575 int n;
576 afs_int32 code = SYNC_OK;
577 SYNC_PROTO_BUF_DECL(buf);
579 if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
580 Log("SYNC_putRes: response_len field in response header inconsistent\n");
581 code = SYNC_COM_ERROR;
582 goto done;
585 if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
586 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
587 code = SYNC_COM_ERROR;
588 goto done;
591 #ifdef AFS_DEMAND_ATTACH_FS
592 res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
593 #endif
594 res->hdr.proto_version = state->proto_version;
595 res->hdr.pkt_seq = ++state->pkt_seq;
596 res->hdr.res_seq = ++state->res_seq;
598 memcpy(buf, &res->hdr, sizeof(res->hdr));
599 if (res->payload.len) {
600 memcpy(buf + sizeof(res->hdr), res->payload.buf,
601 res->hdr.response_len - sizeof(res->hdr));
604 #ifdef AFS_NT40_ENV
605 n = send(fd, buf, res->hdr.response_len, 0);
606 #else /* !AFS_NT40_ENV */
607 n = write(fd, buf, res->hdr.response_len);
608 #endif /* !AFS_NT40_ENV */
610 if (res->hdr.response_len != n) {
611 Log("SYNC_putRes: write failed\n");
612 res->hdr.response = SYNC_COM_ERROR;
613 goto done;
616 done:
617 return code;
620 /* return 0 for legal (null-terminated) string,
621 * 1 for illegal (unterminated) string */
623 SYNC_verifyProtocolString(char * buf, size_t len)
625 size_t s_len;
627 s_len = afs_strnlen(buf, len);
629 return (s_len == len) ? 1 : 0;
633 * clean up old sockets.
635 * @param[in] state server state object
637 * @post unix domain sockets are cleaned up
639 void
640 SYNC_cleanupSock(SYNC_server_state_t * state)
642 #ifdef USE_UNIX_SOCKETS
643 remove(state->addr.sun_path);
644 #endif
648 * bind socket and set it to listen state.
650 * @param[in] state server state object
652 * @return operation status
653 * @retval 0 success
654 * @retval nonzero failure
656 * @post socket bound and set to listen state
659 SYNC_bindSock(SYNC_server_state_t * state)
661 int code;
662 int on = 1;
663 int numTries;
665 /* Reuseaddr needed because system inexplicably leaves crud lying around */
666 code =
667 setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
668 sizeof(on));
669 if (code)
670 Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
672 for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
673 code = bind(state->fd,
674 (struct sockaddr *)&state->addr,
675 AFS_SOCKADDR_LEN(&state->addr));
676 if (code == 0)
677 break;
678 Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
679 errno);
680 sleep(5);
682 listen(state->fd, state->listen_depth);
684 return code;