2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * localhost interprocess communication for servers
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
18 #define FD_SETSIZE 65536
21 #include <afsconfig.h>
22 #include <afs/param.h>
28 #include <afs/afsint.h>
29 #include <afs/errors.h>
30 #include <rx/rx_queue.h>
33 #include "daemon_com.h"
36 #include <afs/afssyscalls.h>
40 #include "partition.h"
42 #include <rx/rx_queue.h>
44 #ifdef USE_UNIX_SOCKETS
45 #include <afs/afsutil.h>
49 int (*V_BreakVolumeCallbacks
) (VolumeId
);
51 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
52 * move = dump+restore can run on single server */
54 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
56 static int SYNC_ask_internal(SYNC_client_state
* state
, SYNC_command
* com
, SYNC_response
* res
);
60 * On AIX, connect() and bind() require use of SUN_LEN() macro;
61 * sizeof(struct sockaddr_un) will not suffice.
63 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
64 #define AFS_SOCKADDR_LEN(sa) SUN_LEN(sa)
66 #define AFS_SOCKADDR_LEN(sa) sizeof(*sa)
70 /* daemon com SYNC general interfaces */
73 * fill in sockaddr structure.
75 * @param[in] endpoint pointer to sync endpoint object
76 * @param[out] addr pointer to sockaddr structure
78 * @post sockaddr structure populated using information from
82 SYNC_getAddr(SYNC_endpoint_t
* endpoint
, SYNC_sockaddr_t
* addr
)
84 memset(addr
, 0, sizeof(*addr
));
86 #ifdef USE_UNIX_SOCKETS
87 addr
->sun_family
= AF_UNIX
;
88 snprintf(addr
->sun_path
, sizeof(addr
->sun_path
), "%s/%s",
89 AFSDIR_SERVER_LOCAL_DIRPATH
, endpoint
->un
);
90 addr
->sun_path
[sizeof(addr
->sun_path
) - 1] = '\0';
91 #else /* !USE_UNIX_SOCKETS */
92 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
93 addr
->sin_len
= sizeof(struct sockaddr_in
);
95 addr
->sin_addr
.s_addr
= htonl(0x7f000001);
96 addr
->sin_family
= AF_INET
; /* was localhost->h_addrtype */
97 addr
->sin_port
= htons(endpoint
->in
); /* XXXX htons not _really_ neccessary */
98 #endif /* !USE_UNIX_SOCKETS */
102 * get a socket descriptor of the appropriate domain.
104 * @param[in] endpoint pointer to sync endpoint object
106 * @return socket descriptor
108 * @post socket of domain specified in endpoint structure is created and
109 * returned to caller.
112 SYNC_getSock(SYNC_endpoint_t
* endpoint
)
115 opr_Verify((sd
= socket(endpoint
->domain
, SOCK_STREAM
, 0)) >= 0);
119 /* daemon com SYNC client interface */
122 * open a client connection to a sync server
124 * @param[in] state pointer to sync client handle
126 * @return operation status
129 * @note at present, this routine aborts rather than returning an error code
132 SYNC_connect(SYNC_client_state
* state
)
134 SYNC_sockaddr_t addr
;
135 /* I can't believe the following is needed for localhost connections!! */
136 static time_t backoff
[] =
137 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
138 time_t *timeout
= &backoff
[0];
140 if (state
->fd
!= OSI_NULLSOCKET
) {
144 SYNC_getAddr(&state
->endpoint
, &addr
);
147 state
->fd
= SYNC_getSock(&state
->endpoint
);
148 if (connect(state
->fd
, (struct sockaddr
*)&addr
, AFS_SOCKADDR_LEN(&addr
)) >= 0)
153 Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
155 SYNC_disconnect(state
);
158 perror("SYNC_connect failed (giving up!)");
163 * forcibly disconnect a sync client handle.
165 * @param[in] state pointer to sync client handle
167 * @retval operation status
171 SYNC_disconnect(SYNC_client_state
* state
)
173 rk_closesocket(state
->fd
);
174 state
->fd
= OSI_NULLSOCKET
;
179 * gracefully disconnect a sync client handle.
181 * @param[in] state pointer to sync client handle
183 * @return operation status
184 * @retval SYNC_OK success
187 SYNC_closeChannel(SYNC_client_state
* state
)
191 SYNC_PROTO_BUF_DECL(ores
);
193 if (state
->fd
== OSI_NULLSOCKET
)
196 memset(&com
, 0, sizeof(com
));
197 memset(&res
, 0, sizeof(res
));
199 res
.payload
.len
= SYNC_PROTO_MAX_LEN
;
200 res
.payload
.buf
= ores
;
202 com
.hdr
.command
= SYNC_COM_CHANNEL_CLOSE
;
203 com
.hdr
.command_len
= sizeof(SYNC_command_hdr
);
204 com
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
206 /* in case the other end dropped, don't do any retries */
207 state
->retry_limit
= 0;
208 state
->hard_timeout
= 0;
210 SYNC_ask(state
, &com
, &res
);
211 SYNC_disconnect(state
);
217 * forcibly break a client connection, and then create a new connection.
219 * @param[in] state pointer to sync client handle
221 * @post old connection dropped; new connection established
223 * @return @see SYNC_connect()
226 SYNC_reconnect(SYNC_client_state
* state
)
228 SYNC_disconnect(state
);
229 return SYNC_connect(state
);
233 * send a command to a sync server and wait for a response.
235 * @param[in] state pointer to sync client handle
236 * @param[in] com command object
237 * @param[out] res response object
239 * @return operation status
240 * @retval SYNC_OK success
241 * @retval SYNC_COM_ERROR communications error
242 * @retval SYNC_BAD_COMMAND server did not recognize command code
244 * @note this routine merely handles error processing; SYNC_ask_internal()
245 * handles the low-level details of communicating with the SYNC server.
247 * @see SYNC_ask_internal
250 SYNC_ask(SYNC_client_state
* state
, SYNC_command
* com
, SYNC_response
* res
)
253 afs_uint32 now
, timeout
, code
=SYNC_OK
;
255 if (state
->fd
== OSI_NULLSOCKET
) {
259 if (state
->fd
== OSI_NULLSOCKET
) {
260 return SYNC_COM_ERROR
;
263 #ifdef AFS_DEMAND_ATTACH_FS
264 com
->hdr
.flags
|= SYNC_FLAG_DAFS_EXTENSIONS
;
267 now
= FT_ApproxTime();
268 timeout
= now
+ state
->hard_timeout
;
270 (tries
<= state
->retry_limit
) && (now
<= timeout
);
271 tries
++, now
= FT_ApproxTime()) {
272 code
= SYNC_ask_internal(state
, com
, res
);
273 if (code
== SYNC_OK
) {
275 } else if (code
== SYNC_BAD_COMMAND
) {
276 Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
277 "fileserver, volserver, salvageserver and salvager are same "
278 "version\n", state
->proto_name
);
280 } else if ((code
== SYNC_COM_ERROR
) && (tries
< state
->retry_limit
)) {
281 Log("SYNC_ask: protocol communications failure on circuit '%s'; "
282 "attempting reconnect to server\n", state
->proto_name
);
283 SYNC_reconnect(state
);
287 * unknown (probably protocol-specific) response code, pass it up to
288 * the caller, and let them deal with it
294 if (code
== SYNC_COM_ERROR
) {
295 Log("SYNC_ask: too many / too latent fatal protocol errors on circuit "
296 "'%s'; giving up (tries %d timeout %d)\n",
297 state
->proto_name
, tries
, timeout
);
304 * send a command to a sync server and wait for a response.
306 * @param[in] state pointer to sync client handle
307 * @param[in] com command object
308 * @param[out] res response object
310 * @return operation status
311 * @retval SYNC_OK success
312 * @retval SYNC_COM_ERROR communications error
317 SYNC_ask_internal(SYNC_client_state
* state
, SYNC_command
* com
, SYNC_response
* res
)
320 SYNC_PROTO_BUF_DECL(buf
);
326 if (state
->fd
== OSI_NULLSOCKET
) {
327 Log("SYNC_ask: invalid sync file descriptor on circuit '%s'\n",
329 res
->hdr
.response
= SYNC_COM_ERROR
;
333 if (com
->hdr
.command_len
> SYNC_PROTO_MAX_LEN
) {
334 Log("SYNC_ask: internal SYNC buffer too small on circuit '%s'; "
335 "please file a bug\n", state
->proto_name
);
336 res
->hdr
.response
= SYNC_COM_ERROR
;
341 * fill in some common header fields
343 com
->hdr
.proto_version
= state
->proto_version
;
344 com
->hdr
.pkt_seq
= ++state
->pkt_seq
;
345 com
->hdr
.com_seq
= ++state
->com_seq
;
350 com
->hdr
.pid
= getpid();
351 #ifdef AFS_PTHREAD_ENV
352 com
->hdr
.tid
= afs_pointer_to_int(pthread_self());
355 PROCESS handle
= LWP_ThreadId();
356 com
->hdr
.tid
= (handle
) ? handle
->index
: 0;
358 #endif /* !AFS_PTHREAD_ENV */
359 #endif /* !AFS_NT40_ENV */
361 memcpy(buf
, &com
->hdr
, sizeof(com
->hdr
));
362 if (com
->payload
.len
) {
363 memcpy(buf
+ sizeof(com
->hdr
), com
->payload
.buf
,
364 com
->hdr
.command_len
- sizeof(com
->hdr
));
368 n
= send(state
->fd
, buf
, com
->hdr
.command_len
, 0);
369 if (n
!= com
->hdr
.command_len
) {
370 Log("SYNC_ask: write failed on circuit '%s'\n", state
->proto_name
);
371 res
->hdr
.response
= SYNC_COM_ERROR
;
375 if (com
->hdr
.command
== SYNC_COM_CHANNEL_CLOSE
) {
376 /* short circuit close channel requests */
377 res
->hdr
.response
= SYNC_OK
;
381 n
= recv(state
->fd
, buf
, SYNC_PROTO_MAX_LEN
, 0);
382 if (n
== 0 || (n
< 0 && WSAEINTR
!= WSAGetLastError())) {
383 Log("SYNC_ask: No response on circuit '%s'\n", state
->proto_name
);
384 res
->hdr
.response
= SYNC_COM_ERROR
;
387 #else /* !AFS_NT40_ENV */
388 n
= write(state
->fd
, buf
, com
->hdr
.command_len
);
389 if (com
->hdr
.command_len
!= n
) {
390 Log("SYNC_ask: write failed on circuit '%s'\n", state
->proto_name
);
391 res
->hdr
.response
= SYNC_COM_ERROR
;
395 if (com
->hdr
.command
== SYNC_COM_CHANNEL_CLOSE
) {
396 /* short circuit close channel requests */
397 res
->hdr
.response
= SYNC_OK
;
401 /* receive the response */
402 iov
[0].iov_base
= (char *)&res
->hdr
;
403 iov
[0].iov_len
= sizeof(res
->hdr
);
404 if (res
->payload
.len
) {
405 iov
[1].iov_base
= (char *)res
->payload
.buf
;
406 iov
[1].iov_len
= res
->payload
.len
;
411 n
= readv(state
->fd
, iov
, iovcnt
);
412 if (n
== 0 || (n
< 0 && errno
!= EINTR
)) {
413 Log("SYNC_ask: No response on circuit '%s'\n", state
->proto_name
);
414 res
->hdr
.response
= SYNC_COM_ERROR
;
417 #endif /* !AFS_NT40_ENV */
421 if (n
< sizeof(res
->hdr
)) {
422 Log("SYNC_ask: response too short on circuit '%s'\n",
424 res
->hdr
.response
= SYNC_COM_ERROR
;
428 memcpy(&res
->hdr
, buf
, sizeof(res
->hdr
));
431 if ((n
- sizeof(res
->hdr
)) > res
->payload
.len
) {
432 Log("SYNC_ask: response too long on circuit '%s'\n",
434 res
->hdr
.response
= SYNC_COM_ERROR
;
438 memcpy(res
->payload
.buf
, buf
+ sizeof(res
->hdr
), n
- sizeof(res
->hdr
));
441 if (res
->hdr
.response_len
!= n
) {
442 Log("SYNC_ask: length field in response inconsistent "
443 "on circuit '%s' command %ld, %d != %lu\n", state
->proto_name
,
444 afs_printable_int32_ld(com
->hdr
.command
),
446 afs_printable_uint32_lu(res
->hdr
.response_len
));
447 res
->hdr
.response
= SYNC_COM_ERROR
;
450 if (res
->hdr
.response
== SYNC_DENIED
) {
451 Log("SYNC_ask: negative response on circuit '%s'\n", state
->proto_name
);
455 return res
->hdr
.response
;
460 * daemon com SYNC server-side interfaces
464 * receive a command structure off a sync socket.
466 * @param[in] state pointer to server-side state object
467 * @param[in] fd file descriptor on which to perform i/o
468 * @param[out] com sync command object to be populated
470 * @return operation status
471 * @retval SYNC_OK command received
472 * @retval SYNC_COM_ERROR there was a socket communications error
475 SYNC_getCom(SYNC_server_state_t
* state
,
480 afs_int32 code
= SYNC_OK
;
482 SYNC_PROTO_BUF_DECL(buf
);
489 n
= recv(fd
, buf
, SYNC_PROTO_MAX_LEN
, 0);
491 if (n
== 0 || (n
< 0 && WSAEINTR
!= WSAGetLastError())) {
492 Log("SYNC_getCom: error receiving command\n");
493 code
= SYNC_COM_ERROR
;
496 #else /* !AFS_NT40_ENV */
497 iov
[0].iov_base
= (char *)&com
->hdr
;
498 iov
[0].iov_len
= sizeof(com
->hdr
);
499 if (com
->payload
.len
) {
500 iov
[1].iov_base
= (char *)com
->payload
.buf
;
501 iov
[1].iov_len
= com
->payload
.len
;
507 n
= readv(fd
, iov
, iovcnt
);
508 if (n
== 0 || (n
< 0 && errno
!= EINTR
)) {
509 Log("SYNC_getCom: error receiving command\n");
510 code
= SYNC_COM_ERROR
;
513 #endif /* !AFS_NT40_ENV */
517 if (n
< sizeof(com
->hdr
)) {
518 Log("SYNC_getCom: command too short\n");
519 code
= SYNC_COM_ERROR
;
523 memcpy(&com
->hdr
, buf
, sizeof(com
->hdr
));
526 if ((n
- sizeof(com
->hdr
)) > com
->payload
.len
) {
527 Log("SYNC_getCom: command too long\n");
528 code
= SYNC_COM_ERROR
;
532 memcpy(com
->payload
.buf
, buf
+ sizeof(com
->hdr
), n
- sizeof(com
->hdr
));
540 * write a response structure to a sync socket.
542 * @param[in] state handle to server-side state object
543 * @param[in] fd file descriptor on which to perform i/o
544 * @param[in] res handle to response packet
546 * @return operation status
548 * @retval SYNC_COM_ERROR
551 SYNC_putRes(SYNC_server_state_t
* state
,
556 afs_int32 code
= SYNC_OK
;
557 SYNC_PROTO_BUF_DECL(buf
);
559 if (res
->hdr
.response_len
> (sizeof(res
->hdr
) + res
->payload
.len
)) {
560 Log("SYNC_putRes: response_len field in response header inconsistent\n");
561 code
= SYNC_COM_ERROR
;
565 if (res
->hdr
.response_len
> SYNC_PROTO_MAX_LEN
) {
566 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
567 code
= SYNC_COM_ERROR
;
571 #ifdef AFS_DEMAND_ATTACH_FS
572 res
->hdr
.flags
|= SYNC_FLAG_DAFS_EXTENSIONS
;
574 res
->hdr
.proto_version
= state
->proto_version
;
575 res
->hdr
.pkt_seq
= ++state
->pkt_seq
;
576 res
->hdr
.res_seq
= ++state
->res_seq
;
578 memcpy(buf
, &res
->hdr
, sizeof(res
->hdr
));
579 if (res
->payload
.len
) {
580 memcpy(buf
+ sizeof(res
->hdr
), res
->payload
.buf
,
581 res
->hdr
.response_len
- sizeof(res
->hdr
));
585 n
= send(fd
, buf
, res
->hdr
.response_len
, 0);
586 #else /* !AFS_NT40_ENV */
587 n
= write(fd
, buf
, res
->hdr
.response_len
);
588 #endif /* !AFS_NT40_ENV */
590 if (res
->hdr
.response_len
!= n
) {
591 Log("SYNC_putRes: write failed\n");
592 res
->hdr
.response
= SYNC_COM_ERROR
;
600 /* return 0 for legal (null-terminated) string,
601 * 1 for illegal (unterminated) string */
603 SYNC_verifyProtocolString(char * buf
, size_t len
)
607 s_len
= strnlen(buf
, len
);
609 return (s_len
== len
) ? 1 : 0;
613 * clean up old sockets.
615 * @param[in] state server state object
617 * @post unix domain sockets are cleaned up
620 SYNC_cleanupSock(SYNC_server_state_t
* state
)
622 #ifdef USE_UNIX_SOCKETS
623 remove(state
->addr
.sun_path
);
628 * bind socket and set it to listen state.
630 * @param[in] state server state object
632 * @return operation status
634 * @retval nonzero failure
636 * @post socket bound and set to listen state
639 SYNC_bindSock(SYNC_server_state_t
* state
)
645 /* Reuseaddr needed because system inexplicably leaves crud lying around */
647 setsockopt(state
->fd
, SOL_SOCKET
, SO_REUSEADDR
, (char *)&on
,
650 Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno
);
652 for (numTries
= 0; numTries
< state
->bind_retry_limit
; numTries
++) {
653 code
= bind(state
->fd
,
654 (struct sockaddr
*)&state
->addr
,
655 AFS_SOCKADDR_LEN(&state
->addr
));
658 Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
662 listen(state
->fd
, state
->listen_depth
);