2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * localhost interprocess communication for servers
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
25 #include <sys/types.h>
31 #include <sys/param.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
39 #include <afs/afs_assert.h>
45 #include <afs/afsint.h>
47 #include <afs/errors.h>
48 #include "daemon_com.h"
51 #include <afs/afssyscalls.h>
55 #include "partition.h"
57 #include <rx/rx_queue.h>
59 #ifdef USE_UNIX_SOCKETS
60 #include <afs/afsutil.h>
64 int (*V_BreakVolumeCallbacks
) (VolumeId
);
66 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
67 * move = dump+restore can run on single server */
69 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
71 static int SYNC_ask_internal(SYNC_client_state
* state
, SYNC_command
* com
, SYNC_response
* res
);
75 * On AIX, connect() and bind() require use of SUN_LEN() macro;
76 * sizeof(struct sockaddr_un) will not suffice.
78 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
79 #define AFS_SOCKADDR_LEN(sa) SUN_LEN(sa)
81 #define AFS_SOCKADDR_LEN(sa) sizeof(*sa)
85 /* daemon com SYNC general interfaces */
88 * fill in sockaddr structure.
90 * @param[in] endpoint pointer to sync endpoint object
91 * @param[out] addr pointer to sockaddr structure
93 * @post sockaddr structure populated using information from
97 SYNC_getAddr(SYNC_endpoint_t
* endpoint
, SYNC_sockaddr_t
* addr
)
99 #ifdef USE_UNIX_SOCKETS
100 char tbuffer
[AFSDIR_PATH_MAX
];
101 #endif /* USE_UNIX_SOCKETS */
103 memset(addr
, 0, sizeof(*addr
));
105 #ifdef USE_UNIX_SOCKETS
106 strcompose(tbuffer
, AFSDIR_PATH_MAX
, AFSDIR_SERVER_LOCAL_DIRPATH
, "/",
108 addr
->sun_family
= AF_UNIX
;
109 strncpy(addr
->sun_path
, tbuffer
, (sizeof(struct sockaddr_un
) - sizeof(short)));
110 #else /* !USE_UNIX_SOCKETS */
111 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
112 addr
->sin_len
= sizeof(struct sockaddr_in
);
114 addr
->sin_addr
.s_addr
= htonl(0x7f000001);
115 addr
->sin_family
= AF_INET
; /* was localhost->h_addrtype */
116 addr
->sin_port
= htons(endpoint
->in
); /* XXXX htons not _really_ neccessary */
117 #endif /* !USE_UNIX_SOCKETS */
121 * get a socket descriptor of the appropriate domain.
123 * @param[in] endpoint pointer to sync endpoint object
125 * @return socket descriptor
127 * @post socket of domain specified in endpoint structure is created and
128 * returned to caller.
131 SYNC_getSock(SYNC_endpoint_t
* endpoint
)
134 osi_Assert((sd
= socket(endpoint
->domain
, SOCK_STREAM
, 0)) >= 0);
138 /* daemon com SYNC client interface */
141 * open a client connection to a sync server
143 * @param[in] state pointer to sync client handle
145 * @return operation status
148 * @note at present, this routine aborts rather than returning an error code
151 SYNC_connect(SYNC_client_state
* state
)
153 SYNC_sockaddr_t addr
;
154 /* I can't believe the following is needed for localhost connections!! */
155 static time_t backoff
[] =
156 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
157 time_t *timeout
= &backoff
[0];
159 if (state
->fd
!= OSI_NULLSOCKET
) {
163 SYNC_getAddr(&state
->endpoint
, &addr
);
166 state
->fd
= SYNC_getSock(&state
->endpoint
);
167 if (connect(state
->fd
, (struct sockaddr
*)&addr
, AFS_SOCKADDR_LEN(&addr
)) >= 0)
172 Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
174 SYNC_disconnect(state
);
177 perror("SYNC_connect failed (giving up!)");
182 * forcibly disconnect a sync client handle.
184 * @param[in] state pointer to sync client handle
186 * @retval operation status
190 SYNC_disconnect(SYNC_client_state
* state
)
193 closesocket(state
->fd
);
197 state
->fd
= OSI_NULLSOCKET
;
202 * gracefully disconnect a sync client handle.
204 * @param[in] state pointer to sync client handle
206 * @return operation status
207 * @retval SYNC_OK success
210 SYNC_closeChannel(SYNC_client_state
* state
)
214 SYNC_PROTO_BUF_DECL(ores
);
216 if (state
->fd
== OSI_NULLSOCKET
)
219 memset(&com
, 0, sizeof(com
));
220 memset(&res
, 0, sizeof(res
));
222 res
.payload
.len
= SYNC_PROTO_MAX_LEN
;
223 res
.payload
.buf
= ores
;
225 com
.hdr
.command
= SYNC_COM_CHANNEL_CLOSE
;
226 com
.hdr
.command_len
= sizeof(SYNC_command_hdr
);
227 com
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
229 /* in case the other end dropped, don't do any retries */
230 state
->retry_limit
= 0;
231 state
->hard_timeout
= 0;
233 SYNC_ask(state
, &com
, &res
);
234 SYNC_disconnect(state
);
240 * forcibly break a client connection, and then create a new connection.
242 * @param[in] state pointer to sync client handle
244 * @post old connection dropped; new connection established
246 * @return @see SYNC_connect()
249 SYNC_reconnect(SYNC_client_state
* state
)
251 SYNC_disconnect(state
);
252 return SYNC_connect(state
);
256 * send a command to a sync server and wait for a response.
258 * @param[in] state pointer to sync client handle
259 * @param[in] com command object
260 * @param[out] res response object
262 * @return operation status
263 * @retval SYNC_OK success
264 * @retval SYNC_COM_ERROR communications error
265 * @retval SYNC_BAD_COMMAND server did not recognize command code
267 * @note this routine merely handles error processing; SYNC_ask_internal()
268 * handles the low-level details of communicating with the SYNC server.
270 * @see SYNC_ask_internal
273 SYNC_ask(SYNC_client_state
* state
, SYNC_command
* com
, SYNC_response
* res
)
276 afs_uint32 now
, timeout
, code
=SYNC_OK
;
278 if (state
->fd
== OSI_NULLSOCKET
) {
282 if (state
->fd
== OSI_NULLSOCKET
) {
283 return SYNC_COM_ERROR
;
286 #ifdef AFS_DEMAND_ATTACH_FS
287 com
->hdr
.flags
|= SYNC_FLAG_DAFS_EXTENSIONS
;
290 now
= FT_ApproxTime();
291 timeout
= now
+ state
->hard_timeout
;
293 (tries
<= state
->retry_limit
) && (now
<= timeout
);
294 tries
++, now
= FT_ApproxTime()) {
295 code
= SYNC_ask_internal(state
, com
, res
);
296 if (code
== SYNC_OK
) {
298 } else if (code
== SYNC_BAD_COMMAND
) {
299 Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
300 "fileserver, volserver, salvageserver and salvager are same "
301 "version\n", state
->proto_name
);
303 } else if ((code
== SYNC_COM_ERROR
) && (tries
< state
->retry_limit
)) {
304 Log("SYNC_ask: protocol communications failure on circuit '%s'; "
305 "attempting reconnect to server\n", state
->proto_name
);
306 SYNC_reconnect(state
);
310 * unknown (probably protocol-specific) response code, pass it up to
311 * the caller, and let them deal with it
317 if (code
== SYNC_COM_ERROR
) {
318 Log("SYNC_ask: too many / too latent fatal protocol errors on circuit "
319 "'%s'; giving up (tries %d timeout %d)\n",
320 state
->proto_name
, tries
, timeout
);
327 * send a command to a sync server and wait for a response.
329 * @param[in] state pointer to sync client handle
330 * @param[in] com command object
331 * @param[out] res response object
333 * @return operation status
334 * @retval SYNC_OK success
335 * @retval SYNC_COM_ERROR communications error
340 SYNC_ask_internal(SYNC_client_state
* state
, SYNC_command
* com
, SYNC_response
* res
)
343 SYNC_PROTO_BUF_DECL(buf
);
349 if (state
->fd
== OSI_NULLSOCKET
) {
350 Log("SYNC_ask: invalid sync file descriptor on circuit '%s'\n",
352 res
->hdr
.response
= SYNC_COM_ERROR
;
356 if (com
->hdr
.command_len
> SYNC_PROTO_MAX_LEN
) {
357 Log("SYNC_ask: internal SYNC buffer too small on circuit '%s'; "
358 "please file a bug\n", state
->proto_name
);
359 res
->hdr
.response
= SYNC_COM_ERROR
;
364 * fill in some common header fields
366 com
->hdr
.proto_version
= state
->proto_version
;
367 com
->hdr
.pkt_seq
= ++state
->pkt_seq
;
368 com
->hdr
.com_seq
= ++state
->com_seq
;
373 com
->hdr
.pid
= getpid();
374 #ifdef AFS_PTHREAD_ENV
375 com
->hdr
.tid
= afs_pointer_to_int(pthread_self());
378 PROCESS handle
= LWP_ThreadId();
379 com
->hdr
.tid
= (handle
) ? handle
->index
: 0;
381 #endif /* !AFS_PTHREAD_ENV */
382 #endif /* !AFS_NT40_ENV */
384 memcpy(buf
, &com
->hdr
, sizeof(com
->hdr
));
385 if (com
->payload
.len
) {
386 memcpy(buf
+ sizeof(com
->hdr
), com
->payload
.buf
,
387 com
->hdr
.command_len
- sizeof(com
->hdr
));
391 n
= send(state
->fd
, buf
, com
->hdr
.command_len
, 0);
392 if (n
!= com
->hdr
.command_len
) {
393 Log("SYNC_ask: write failed on circuit '%s'\n", state
->proto_name
);
394 res
->hdr
.response
= SYNC_COM_ERROR
;
398 if (com
->hdr
.command
== SYNC_COM_CHANNEL_CLOSE
) {
399 /* short circuit close channel requests */
400 res
->hdr
.response
= SYNC_OK
;
404 n
= recv(state
->fd
, buf
, SYNC_PROTO_MAX_LEN
, 0);
405 if (n
== 0 || (n
< 0 && WSAEINTR
!= WSAGetLastError())) {
406 Log("SYNC_ask: No response on circuit '%s'\n", state
->proto_name
);
407 res
->hdr
.response
= SYNC_COM_ERROR
;
410 #else /* !AFS_NT40_ENV */
411 n
= write(state
->fd
, buf
, com
->hdr
.command_len
);
412 if (com
->hdr
.command_len
!= n
) {
413 Log("SYNC_ask: write failed on circuit '%s'\n", state
->proto_name
);
414 res
->hdr
.response
= SYNC_COM_ERROR
;
418 if (com
->hdr
.command
== SYNC_COM_CHANNEL_CLOSE
) {
419 /* short circuit close channel requests */
420 res
->hdr
.response
= SYNC_OK
;
424 /* receive the response */
425 iov
[0].iov_base
= (char *)&res
->hdr
;
426 iov
[0].iov_len
= sizeof(res
->hdr
);
427 if (res
->payload
.len
) {
428 iov
[1].iov_base
= (char *)res
->payload
.buf
;
429 iov
[1].iov_len
= res
->payload
.len
;
434 n
= readv(state
->fd
, iov
, iovcnt
);
435 if (n
== 0 || (n
< 0 && errno
!= EINTR
)) {
436 Log("SYNC_ask: No response on circuit '%s'\n", state
->proto_name
);
437 res
->hdr
.response
= SYNC_COM_ERROR
;
440 #endif /* !AFS_NT40_ENV */
444 if (n
< sizeof(res
->hdr
)) {
445 Log("SYNC_ask: response too short on circuit '%s'\n",
447 res
->hdr
.response
= SYNC_COM_ERROR
;
451 memcpy(&res
->hdr
, buf
, sizeof(res
->hdr
));
454 if ((n
- sizeof(res
->hdr
)) > res
->payload
.len
) {
455 Log("SYNC_ask: response too long on circuit '%s'\n",
457 res
->hdr
.response
= SYNC_COM_ERROR
;
461 memcpy(res
->payload
.buf
, buf
+ sizeof(res
->hdr
), n
- sizeof(res
->hdr
));
464 if (res
->hdr
.response_len
!= n
) {
465 Log("SYNC_ask: length field in response inconsistent "
466 "on circuit '%s'\n", state
->proto_name
);
467 res
->hdr
.response
= SYNC_COM_ERROR
;
470 if (res
->hdr
.response
== SYNC_DENIED
) {
471 Log("SYNC_ask: negative response on circuit '%s'\n", state
->proto_name
);
475 return res
->hdr
.response
;
480 * daemon com SYNC server-side interfaces
484 * receive a command structure off a sync socket.
486 * @param[in] state pointer to server-side state object
487 * @param[in] fd file descriptor on which to perform i/o
488 * @param[out] com sync command object to be populated
490 * @return operation status
491 * @retval SYNC_OK command received
492 * @retval SYNC_COM_ERROR there was a socket communications error
495 SYNC_getCom(SYNC_server_state_t
* state
,
500 afs_int32 code
= SYNC_OK
;
502 SYNC_PROTO_BUF_DECL(buf
);
509 n
= recv(fd
, buf
, SYNC_PROTO_MAX_LEN
, 0);
511 if (n
== 0 || (n
< 0 && WSAEINTR
!= WSAGetLastError())) {
512 Log("SYNC_getCom: error receiving command\n");
513 code
= SYNC_COM_ERROR
;
516 #else /* !AFS_NT40_ENV */
517 iov
[0].iov_base
= (char *)&com
->hdr
;
518 iov
[0].iov_len
= sizeof(com
->hdr
);
519 if (com
->payload
.len
) {
520 iov
[1].iov_base
= (char *)com
->payload
.buf
;
521 iov
[1].iov_len
= com
->payload
.len
;
527 n
= readv(fd
, iov
, iovcnt
);
528 if (n
== 0 || (n
< 0 && errno
!= EINTR
)) {
529 Log("SYNC_getCom: error receiving command\n");
530 code
= SYNC_COM_ERROR
;
533 #endif /* !AFS_NT40_ENV */
537 if (n
< sizeof(com
->hdr
)) {
538 Log("SYNC_getCom: command too short\n");
539 code
= SYNC_COM_ERROR
;
543 memcpy(&com
->hdr
, buf
, sizeof(com
->hdr
));
546 if ((n
- sizeof(com
->hdr
)) > com
->payload
.len
) {
547 Log("SYNC_getCom: command too long\n");
548 code
= SYNC_COM_ERROR
;
552 memcpy(com
->payload
.buf
, buf
+ sizeof(com
->hdr
), n
- sizeof(com
->hdr
));
560 * write a response structure to a sync socket.
562 * @param[in] state handle to server-side state object
563 * @param[in] fd file descriptor on which to perform i/o
564 * @param[in] res handle to response packet
566 * @return operation status
568 * @retval SYNC_COM_ERROR
571 SYNC_putRes(SYNC_server_state_t
* state
,
576 afs_int32 code
= SYNC_OK
;
577 SYNC_PROTO_BUF_DECL(buf
);
579 if (res
->hdr
.response_len
> (sizeof(res
->hdr
) + res
->payload
.len
)) {
580 Log("SYNC_putRes: response_len field in response header inconsistent\n");
581 code
= SYNC_COM_ERROR
;
585 if (res
->hdr
.response_len
> SYNC_PROTO_MAX_LEN
) {
586 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
587 code
= SYNC_COM_ERROR
;
591 #ifdef AFS_DEMAND_ATTACH_FS
592 res
->hdr
.flags
|= SYNC_FLAG_DAFS_EXTENSIONS
;
594 res
->hdr
.proto_version
= state
->proto_version
;
595 res
->hdr
.pkt_seq
= ++state
->pkt_seq
;
596 res
->hdr
.res_seq
= ++state
->res_seq
;
598 memcpy(buf
, &res
->hdr
, sizeof(res
->hdr
));
599 if (res
->payload
.len
) {
600 memcpy(buf
+ sizeof(res
->hdr
), res
->payload
.buf
,
601 res
->hdr
.response_len
- sizeof(res
->hdr
));
605 n
= send(fd
, buf
, res
->hdr
.response_len
, 0);
606 #else /* !AFS_NT40_ENV */
607 n
= write(fd
, buf
, res
->hdr
.response_len
);
608 #endif /* !AFS_NT40_ENV */
610 if (res
->hdr
.response_len
!= n
) {
611 Log("SYNC_putRes: write failed\n");
612 res
->hdr
.response
= SYNC_COM_ERROR
;
620 /* return 0 for legal (null-terminated) string,
621 * 1 for illegal (unterminated) string */
623 SYNC_verifyProtocolString(char * buf
, size_t len
)
627 s_len
= afs_strnlen(buf
, len
);
629 return (s_len
== len
) ? 1 : 0;
633 * clean up old sockets.
635 * @param[in] state server state object
637 * @post unix domain sockets are cleaned up
640 SYNC_cleanupSock(SYNC_server_state_t
* state
)
642 #ifdef USE_UNIX_SOCKETS
643 remove(state
->addr
.sun_path
);
648 * bind socket and set it to listen state.
650 * @param[in] state server state object
652 * @return operation status
654 * @retval nonzero failure
656 * @post socket bound and set to listen state
659 SYNC_bindSock(SYNC_server_state_t
* state
)
665 /* Reuseaddr needed because system inexplicably leaves crud lying around */
667 setsockopt(state
->fd
, SOL_SOCKET
, SO_REUSEADDR
, (char *)&on
,
670 Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno
);
672 for (numTries
= 0; numTries
< state
->bind_retry_limit
; numTries
++) {
673 code
= bind(state
->fd
,
674 (struct sockaddr
*)&state
->addr
,
675 AFS_SOCKADDR_LEN(&state
->addr
));
678 Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
682 listen(state
->fd
, state
->listen_depth
);