Update NEWS for 1.6.22
[pkg-k5-afs_openafs.git] / src / vol / salvsync-server.c
blobf054dd6c6512ecff81fb1f3cafa1beadc7634c55
1 /*
2 * Copyright 2006-2008, Sine Nomine Associates and others.
3 * All Rights Reserved.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
11 * salvsync-server.c
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
22 #ifndef _WIN32
23 #define FD_SETSIZE 65536
24 #endif
26 #include <afsconfig.h>
27 #include <afs/param.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #ifdef AFS_NT40_ENV
33 #include <winsock2.h>
34 #include <time.h>
35 #else
36 #include <sys/param.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <netdb.h>
40 #include <sys/time.h>
41 #endif
42 #include <errno.h>
43 #include <afs/afs_assert.h>
44 #include <signal.h>
45 #include <string.h>
48 #include <rx/xdr.h>
49 #include <afs/afsint.h>
50 #include "nfs.h"
51 #include <afs/errors.h>
52 #include "salvsync.h"
53 #include "lwp.h"
54 #include "lock.h"
55 #include <afs/afssyscalls.h>
56 #include "ihandle.h"
57 #include "vnode.h"
58 #include "volume.h"
59 #include "partition.h"
60 #include "common.h"
61 #include <rx/rx_queue.h>
62 #include <afs/procmgmt.h>
64 #if !defined(offsetof)
65 #include <stddef.h>
66 #endif
68 #ifdef USE_UNIX_SOCKETS
69 #include <afs/afsutil.h>
70 #include <sys/un.h>
71 #endif
73 #ifndef WCOREDUMP
74 #define WCOREDUMP(x) ((x) & 0200)
75 #endif
77 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
78 * move = dump+restore can run on single server */
82 * This lock controls access to the handler array.
84 struct Lock SALVSYNC_handler_lock;
87 #ifdef AFS_DEMAND_ATTACH_FS
89 * SALVSYNC is a feature specific to the demand attach fileserver
92 /* Forward declarations */
93 static void * SALVSYNC_syncThread(void *);
94 static void SALVSYNC_newconnection(osi_socket fd);
95 static void SALVSYNC_com(osi_socket fd);
96 static void SALVSYNC_Drop(osi_socket fd);
97 static void AcceptOn(void);
98 static void AcceptOff(void);
99 static void InitHandler(void);
100 static void CallHandler(fd_set * fdsetp);
101 static int AddHandler(osi_socket afd, void (*aproc) (int));
102 static int FindHandler(osi_socket afd);
103 static int FindHandler_r(osi_socket afd);
104 static int RemoveHandler(osi_socket afd);
105 static void GetHandler(fd_set * fdsetp, int *maxfdp);
107 static int AllocNode(struct SalvageQueueNode ** node);
109 static int AddToSalvageQueue(struct SalvageQueueNode * node);
110 static void DeleteFromSalvageQueue(struct SalvageQueueNode * node);
111 static void AddToPendingQueue(struct SalvageQueueNode * node);
112 static void DeleteFromPendingQueue(struct SalvageQueueNode * node);
113 static struct SalvageQueueNode * LookupPendingCommandByPid(int pid);
114 static void UpdateCommandPrio(struct SalvageQueueNode * node);
115 static void HandlePrio(struct SalvageQueueNode * clone,
116 struct SalvageQueueNode * parent,
117 afs_uint32 new_prio);
119 static int LinkNode(struct SalvageQueueNode * parent,
120 struct SalvageQueueNode * clone);
122 static struct SalvageQueueNode * LookupNode(VolumeId vid, char * partName,
123 struct SalvageQueueNode ** parent);
124 static struct SalvageQueueNode * LookupNodeByCommand(SALVSYNC_command_hdr * qry,
125 struct SalvageQueueNode ** parent);
126 static void AddNodeToHash(struct SalvageQueueNode * node);
128 static afs_int32 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res);
129 static afs_int32 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res);
130 static afs_int32 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res);
131 static afs_int32 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res);
132 static afs_int32 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res);
135 extern int LogLevel;
136 extern int VInit;
137 extern pthread_mutex_t vol_salvsync_mutex;
140 * salvsync server socket handle.
142 static SYNC_server_state_t salvsync_server_state =
143 { OSI_NULLSOCKET, /* file descriptor */
144 SALVSYNC_ENDPOINT_DECL, /* server endpoint */
145 SALVSYNC_PROTO_VERSION, /* protocol version */
146 5, /* bind() retry limit */
147 100, /* listen() queue depth */
148 "SALVSYNC", /* protocol name string */
153 * queue of all volumes waiting to be salvaged.
155 struct SalvageQueue {
156 volatile int total_len;
157 volatile afs_int32 last_insert; /**< id of last partition to have a salvage node inserted */
158 volatile int len[VOLMAXPARTS+1];
159 volatile struct rx_queue part[VOLMAXPARTS+1]; /**< per-partition queues of pending salvages */
160 pthread_cond_t cv;
162 static struct SalvageQueue salvageQueue; /* volumes waiting to be salvaged */
165 * queue of all volumes currently being salvaged.
167 struct QueueHead {
168 volatile struct rx_queue q; /**< queue of salvages in progress */
169 volatile int len; /**< length of in-progress queue */
170 pthread_cond_t queue_change_cv;
172 static struct QueueHead pendingQueue; /* volumes being salvaged */
174 /* XXX
175 * whether a partition has a salvage in progress
177 * the salvager code only permits one salvage per partition at a time
179 * the following hack tries to keep salvaged parallelism high by
180 * only permitting one salvage dispatch per partition at a time
182 * unfortunately, the parallel salvager currently
183 * has a rather braindead routine that won't permit
184 * multiple salvages on the same "device". this
185 * function happens to break pretty badly on lvm, raid luns, etc.
187 * this hack isn't good enough to stop the device limiting code from
188 * crippling performance. someday that code needs to be rewritten
190 static int partition_salvaging[VOLMAXPARTS+1];
192 static int HandlerFD[MAXHANDLERS];
193 static void (*HandlerProc[MAXHANDLERS]) (int);
195 #define VSHASH_SIZE 64
196 #define VSHASH_MASK (VSHASH_SIZE-1)
197 #define VSHASH(vid) ((vid)&VSHASH_MASK)
199 static struct QueueHead SalvageHashTable[VSHASH_SIZE];
201 static struct SalvageQueueNode *
202 LookupNode(afs_uint32 vid, char * partName,
203 struct SalvageQueueNode ** parent)
205 struct rx_queue *qp, *nqp;
206 struct SalvageQueueNode *vsp;
207 int idx = VSHASH(vid);
209 for (queue_Scan(&SalvageHashTable[idx], qp, nqp, rx_queue)) {
210 vsp = (struct SalvageQueueNode *)((char *)qp - offsetof(struct SalvageQueueNode, hash_chain));
211 if ((vsp->command.sop.volume == vid) &&
212 !strncmp(vsp->command.sop.partName, partName, sizeof(vsp->command.sop.partName))) {
213 break;
217 if (queue_IsEnd(&SalvageHashTable[idx], qp)) {
218 vsp = NULL;
221 if (parent) {
222 if (vsp) {
223 *parent = (vsp->type == SALVSYNC_VOLGROUP_CLONE) ?
224 vsp->volgroup.parent : vsp;
225 } else {
226 *parent = NULL;
230 return vsp;
233 static struct SalvageQueueNode *
234 LookupNodeByCommand(SALVSYNC_command_hdr * qry,
235 struct SalvageQueueNode ** parent)
237 return LookupNode(qry->volume, qry->partName, parent);
240 static void
241 AddNodeToHash(struct SalvageQueueNode * node)
243 int idx = VSHASH(node->command.sop.volume);
245 if (queue_IsOnQueue(&node->hash_chain)) {
246 return;
249 queue_Append(&SalvageHashTable[idx], &node->hash_chain);
250 SalvageHashTable[idx].len++;
253 #if 0
254 static void
255 DeleteNodeFromHash(struct SalvageQueueNode * node)
257 int idx = VSHASH(node->command.sop.volume);
259 if (queue_IsNotOnQueue(&node->hash_chain)) {
260 return;
263 queue_Remove(&node->hash_chain);
264 SalvageHashTable[idx].len--;
266 #endif
268 void
269 SALVSYNC_salvInit(void)
271 int i;
272 pthread_t tid;
273 pthread_attr_t tattr;
275 /* initialize the queues */
276 Lock_Init(&SALVSYNC_handler_lock);
277 CV_INIT(&salvageQueue.cv, "sq", CV_DEFAULT, 0);
278 for (i = 0; i <= VOLMAXPARTS; i++) {
279 queue_Init(&salvageQueue.part[i]);
280 salvageQueue.len[i] = 0;
282 CV_INIT(&pendingQueue.queue_change_cv, "queuechange", CV_DEFAULT, 0);
283 queue_Init(&pendingQueue);
284 salvageQueue.total_len = pendingQueue.len = 0;
285 salvageQueue.last_insert = -1;
286 memset(partition_salvaging, 0, sizeof(partition_salvaging));
288 for (i = 0; i < VSHASH_SIZE; i++) {
289 CV_INIT(&SalvageHashTable[i].queue_change_cv, "queuechange", CV_DEFAULT, 0);
290 SalvageHashTable[i].len = 0;
291 queue_Init(&SalvageHashTable[i]);
294 /* start the salvsync thread */
295 osi_Assert(pthread_attr_init(&tattr) == 0);
296 osi_Assert(pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) == 0);
297 osi_Assert(pthread_create(&tid, &tattr, SALVSYNC_syncThread, NULL) == 0);
300 static void
301 CleanFDs(void)
303 int i;
304 for (i = 0; i < MAXHANDLERS; ++i) {
305 if (HandlerFD[i] >= 0) {
306 SALVSYNC_Drop(HandlerFD[i]);
310 /* just in case we were in AcceptOff mode, and thus this fd wouldn't
311 * have a handler */
312 close(salvsync_server_state.fd);
313 salvsync_server_state.fd = OSI_NULLSOCKET;
316 static fd_set SALVSYNC_readfds;
318 static void *
319 SALVSYNC_syncThread(void * args)
321 int code;
322 SYNC_server_state_t * state = &salvsync_server_state;
324 /* when we fork, the child needs to close the salvsync server sockets,
325 * otherwise, it may get salvsync requests, instead of the parent
326 * salvageserver */
327 osi_Assert(pthread_atfork(NULL, NULL, CleanFDs) == 0);
329 SYNC_getAddr(&state->endpoint, &state->addr);
330 SYNC_cleanupSock(state);
332 #ifndef AFS_NT40_ENV
333 (void)signal(SIGPIPE, SIG_IGN);
334 #endif
336 state->fd = SYNC_getSock(&state->endpoint);
337 code = SYNC_bindSock(state);
338 osi_Assert(!code);
340 InitHandler();
341 AcceptOn();
343 for (;;) {
344 int maxfd;
345 struct timeval s_timeout;
346 GetHandler(&SALVSYNC_readfds, &maxfd);
347 s_timeout.tv_sec = SYNC_SELECT_TIMEOUT;
348 s_timeout.tv_usec = 0;
349 /* Note: check for >= 1 below is essential since IOMGR_select
350 * doesn't have exactly same semantics as select.
352 if (select(maxfd + 1, &SALVSYNC_readfds, NULL, NULL, &s_timeout) >= 1)
353 CallHandler(&SALVSYNC_readfds);
356 return NULL;
359 static void
360 SALVSYNC_newconnection(int afd)
362 #ifdef USE_UNIX_SOCKETS
363 struct sockaddr_un other;
364 #else /* USE_UNIX_SOCKETS */
365 struct sockaddr_in other;
366 #endif
367 int fd;
368 socklen_t junk;
370 junk = sizeof(other);
371 fd = accept(afd, (struct sockaddr *)&other, &junk);
372 if (fd == OSI_NULLSOCKET) {
373 osi_Panic("SALVSYNC_newconnection: accept failed, errno==%d\n", errno);
374 } else if (!AddHandler(fd, SALVSYNC_com)) {
375 AcceptOff();
376 osi_Assert(AddHandler(fd, SALVSYNC_com));
380 /* this function processes commands from an salvsync file descriptor (fd) */
381 static afs_int32 SALV_cnt = 0;
382 static void
383 SALVSYNC_com(osi_socket fd)
385 SYNC_command com;
386 SYNC_response res;
387 SALVSYNC_response_hdr sres_hdr;
388 SALVSYNC_command scom;
389 SALVSYNC_response sres;
390 SYNC_PROTO_BUF_DECL(buf);
392 memset(&com, 0, sizeof(com));
393 memset(&res, 0, sizeof(res));
394 memset(&scom, 0, sizeof(scom));
395 memset(&sres, 0, sizeof(sres));
396 memset(&sres_hdr, 0, sizeof(sres_hdr));
398 com.payload.buf = (void *)buf;
399 com.payload.len = SYNC_PROTO_MAX_LEN;
400 res.payload.buf = (void *) &sres_hdr;
401 res.payload.len = sizeof(sres_hdr);
402 res.hdr.response_len = sizeof(res.hdr) + sizeof(sres_hdr);
403 res.hdr.proto_version = SALVSYNC_PROTO_VERSION;
405 scom.hdr = &com.hdr;
406 scom.sop = (SALVSYNC_command_hdr *) buf;
407 scom.com = &com;
408 sres.hdr = &res.hdr;
409 sres.sop = &sres_hdr;
410 sres.res = &res;
412 SALV_cnt++;
413 if (SYNC_getCom(&salvsync_server_state, fd, &com)) {
414 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt);
415 SALVSYNC_Drop(fd);
416 return;
419 if (com.recv_len < sizeof(com.hdr)) {
420 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
421 res.hdr.response = SYNC_COM_ERROR;
422 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
423 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
424 goto respond;
427 if (com.hdr.proto_version != SALVSYNC_PROTO_VERSION) {
428 Log("SALVSYNC_com: invalid protocol version (%u)\n", com.hdr.proto_version);
429 res.hdr.response = SYNC_COM_ERROR;
430 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
431 goto respond;
434 if (com.hdr.command == SYNC_COM_CHANNEL_CLOSE) {
435 res.hdr.response = SYNC_OK;
436 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
438 /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
439 * never wait for a response. */
440 goto done;
443 if (com.recv_len != (sizeof(com.hdr) + sizeof(SALVSYNC_command_hdr))) {
444 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com.recv_len);
445 res.hdr.response = SYNC_COM_ERROR;
446 res.hdr.reason = SYNC_REASON_MALFORMED_PACKET;
447 res.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
448 goto respond;
451 res.hdr.com_seq = com.hdr.com_seq;
453 VOL_LOCK;
454 switch (com.hdr.command) {
455 case SALVSYNC_NOP:
456 break;
457 case SALVSYNC_SALVAGE:
458 case SALVSYNC_RAISEPRIO:
459 res.hdr.response = SALVSYNC_com_Salvage(&scom, &sres);
460 break;
461 case SALVSYNC_CANCEL:
462 /* cancel a salvage */
463 res.hdr.response = SALVSYNC_com_Cancel(&scom, &sres);
464 break;
465 case SALVSYNC_CANCELALL:
466 /* cancel all queued salvages */
467 res.hdr.response = SALVSYNC_com_CancelAll(&scom, &sres);
468 break;
469 case SALVSYNC_QUERY:
470 /* query whether a volume is done salvaging */
471 res.hdr.response = SALVSYNC_com_Query(&scom, &sres);
472 break;
473 case SALVSYNC_OP_LINK:
474 /* link a clone to its parent in the scheduler */
475 res.hdr.response = SALVSYNC_com_Link(&scom, &sres);
476 break;
477 default:
478 res.hdr.response = SYNC_BAD_COMMAND;
479 break;
482 sres_hdr.sq_len = salvageQueue.total_len;
483 sres_hdr.pq_len = pendingQueue.len;
484 VOL_UNLOCK;
486 respond:
487 SYNC_putRes(&salvsync_server_state, fd, &res);
489 done:
490 if (res.hdr.flags & SYNC_FLAG_CHANNEL_SHUTDOWN) {
491 SALVSYNC_Drop(fd);
496 * request that a volume be salvaged.
498 * @param[in] com inbound command object
499 * @param[out] res outbound response object
501 * @return operation status
502 * @retval SYNC_OK success
503 * @retval SYNC_DENIED failed to enqueue request
504 * @retval SYNC_FAILED malformed command packet
506 * @note this is a SALVSYNC protocol rpc handler
508 * @internal
510 * @post the volume is enqueued in the to-be-salvaged queue.
511 * if the volume was already in the salvage queue, its
512 * priority (and thus its location in the queue) are
513 * updated.
515 static afs_int32
516 SALVSYNC_com_Salvage(SALVSYNC_command * com, SALVSYNC_response * res)
518 afs_int32 code = SYNC_OK;
519 struct SalvageQueueNode * node, * clone;
520 int hash = 0;
522 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
523 code = SYNC_FAILED;
524 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
525 goto done;
528 clone = LookupNodeByCommand(com->sop, &node);
530 if (node == NULL) {
531 if (AllocNode(&node)) {
532 code = SYNC_DENIED;
533 res->hdr->reason = SYNC_REASON_NOMEM;
534 goto done;
536 clone = node;
537 hash = 1;
540 HandlePrio(clone, node, com->sop->prio);
542 switch (node->state) {
543 case SALVSYNC_STATE_QUEUED:
544 UpdateCommandPrio(node);
545 break;
547 case SALVSYNC_STATE_ERROR:
548 case SALVSYNC_STATE_DONE:
549 case SALVSYNC_STATE_UNKNOWN:
550 memcpy(&clone->command.com, com->hdr, sizeof(SYNC_command_hdr));
551 memcpy(&clone->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
554 * make sure volgroup parent partition path is kept coherent
556 * If we ever want to support non-COW clones on a machine holding
557 * the RW site, please note that this code does not work under the
558 * conditions where someone zaps a COW clone on partition X, and
559 * subsequently creates a full clone on partition Y -- we'd need
560 * an inverse to SALVSYNC_com_Link.
561 * -- tkeiser 11/28/2007
563 strcpy(node->command.sop.partName, com->sop->partName);
565 if (AddToSalvageQueue(node)) {
566 code = SYNC_DENIED;
568 break;
570 default:
571 break;
574 if (hash) {
575 AddNodeToHash(node);
578 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
579 res->sop->state = node->state;
580 res->sop->prio = node->command.sop.prio;
582 done:
583 return code;
587 * cancel a pending salvage request.
589 * @param[in] com inbound command object
590 * @param[out] res outbound response object
592 * @return operation status
593 * @retval SYNC_OK success
594 * @retval SYNC_FAILED malformed command packet
596 * @note this is a SALVSYNC protocol rpc handler
598 * @internal
600 static afs_int32
601 SALVSYNC_com_Cancel(SALVSYNC_command * com, SALVSYNC_response * res)
603 afs_int32 code = SYNC_OK;
604 struct SalvageQueueNode * node;
606 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
607 code = SYNC_FAILED;
608 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
609 goto done;
612 node = LookupNodeByCommand(com->sop, NULL);
614 if (node == NULL) {
615 res->sop->state = SALVSYNC_STATE_UNKNOWN;
616 res->sop->prio = 0;
617 } else {
618 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
619 res->sop->prio = node->command.sop.prio;
620 res->sop->state = node->state;
621 if ((node->type == SALVSYNC_VOLGROUP_PARENT) &&
622 (node->state == SALVSYNC_STATE_QUEUED)) {
623 DeleteFromSalvageQueue(node);
627 done:
628 return code;
632 * cancel all pending salvage requests.
634 * @param[in] com incoming command object
635 * @param[out] res outbound response object
637 * @return operation status
638 * @retval SYNC_OK success
640 * @note this is a SALVSYNC protocol rpc handler
642 * @internal
644 static afs_int32
645 SALVSYNC_com_CancelAll(SALVSYNC_command * com, SALVSYNC_response * res)
647 struct SalvageQueueNode * np, *nnp;
648 struct DiskPartition64 * dp;
650 for (dp = DiskPartitionList ; dp ; dp = dp->next) {
651 for (queue_Scan(&salvageQueue.part[dp->index], np, nnp, SalvageQueueNode)) {
652 DeleteFromSalvageQueue(np);
656 return SYNC_OK;
660 * link a queue node for a clone to its parent volume.
662 * @param[in] com inbound command object
663 * @param[out] res outbound response object
665 * @return operation status
666 * @retval SYNC_OK success
667 * @retval SYNC_FAILED malformed command packet
668 * @retval SYNC_DENIED the request could not be completed
670 * @note this is a SALVSYNC protocol rpc handler
672 * @post the requested volume is marked as a child of another volume.
673 * thus, future salvage requests for this volume will result in the
674 * parent of the volume group being scheduled for salvage instead
675 * of this clone.
677 * @internal
679 static afs_int32
680 SALVSYNC_com_Link(SALVSYNC_command * com, SALVSYNC_response * res)
682 afs_int32 code = SYNC_OK;
683 struct SalvageQueueNode * clone, * parent;
685 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
686 code = SYNC_FAILED;
687 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
688 goto done;
691 /* lookup clone's salvage scheduling node */
692 clone = LookupNodeByCommand(com->sop, NULL);
693 if (clone == NULL) {
694 code = SYNC_DENIED;
695 res->hdr->reason = SALVSYNC_REASON_ERROR;
696 goto done;
699 /* lookup parent's salvage scheduling node */
700 parent = LookupNode(com->sop->parent, com->sop->partName, NULL);
701 if (parent == NULL) {
702 if (AllocNode(&parent)) {
703 code = SYNC_DENIED;
704 res->hdr->reason = SYNC_REASON_NOMEM;
705 goto done;
707 memcpy(&parent->command.com, com->hdr, sizeof(SYNC_command_hdr));
708 memcpy(&parent->command.sop, com->sop, sizeof(SALVSYNC_command_hdr));
709 parent->command.sop.volume = parent->command.sop.parent = com->sop->parent;
710 AddNodeToHash(parent);
713 if (LinkNode(parent, clone)) {
714 code = SYNC_DENIED;
715 goto done;
718 done:
719 return code;
723 * query the status of a volume salvage request.
725 * @param[in] com inbound command object
726 * @param[out] res outbound response object
728 * @return operation status
729 * @retval SYNC_OK success
730 * @retval SYNC_FAILED malformed command packet
732 * @note this is a SALVSYNC protocol rpc handler
734 * @internal
736 static afs_int32
737 SALVSYNC_com_Query(SALVSYNC_command * com, SALVSYNC_response * res)
739 afs_int32 code = SYNC_OK;
740 struct SalvageQueueNode * node;
742 if (SYNC_verifyProtocolString(com->sop->partName, sizeof(com->sop->partName))) {
743 code = SYNC_FAILED;
744 res->hdr->reason = SYNC_REASON_MALFORMED_PACKET;
745 goto done;
748 LookupNodeByCommand(com->sop, &node);
750 /* query whether a volume is done salvaging */
751 if (node == NULL) {
752 res->sop->state = SALVSYNC_STATE_UNKNOWN;
753 res->sop->prio = 0;
754 } else {
755 res->hdr->flags |= SALVSYNC_FLAG_VOL_STATS_VALID;
756 res->sop->state = node->state;
757 res->sop->prio = node->command.sop.prio;
760 done:
761 return code;
764 static void
765 SALVSYNC_Drop(osi_socket fd)
767 RemoveHandler(fd);
768 #ifdef AFS_NT40_ENV
769 closesocket(fd);
770 #else
771 close(fd);
772 #endif
773 AcceptOn();
776 static int AcceptHandler = -1; /* handler id for accept, if turned on */
778 static void
779 AcceptOn(void)
781 if (AcceptHandler == -1) {
782 osi_Assert(AddHandler(salvsync_server_state.fd, SALVSYNC_newconnection));
783 AcceptHandler = FindHandler(salvsync_server_state.fd);
787 static void
788 AcceptOff(void)
790 if (AcceptHandler != -1) {
791 osi_Assert(RemoveHandler(salvsync_server_state.fd));
792 AcceptHandler = -1;
796 /* The multiple FD handling code. */
798 static void
799 InitHandler(void)
801 int i;
802 ObtainWriteLock(&SALVSYNC_handler_lock);
803 for (i = 0; i < MAXHANDLERS; i++) {
804 HandlerFD[i] = OSI_NULLSOCKET;
805 HandlerProc[i] = NULL;
807 ReleaseWriteLock(&SALVSYNC_handler_lock);
810 static void
811 CallHandler(fd_set * fdsetp)
813 int i;
814 ObtainReadLock(&SALVSYNC_handler_lock);
815 for (i = 0; i < MAXHANDLERS; i++) {
816 if (HandlerFD[i] >= 0 && FD_ISSET(HandlerFD[i], fdsetp)) {
817 ReleaseReadLock(&SALVSYNC_handler_lock);
818 (*HandlerProc[i]) (HandlerFD[i]);
819 ObtainReadLock(&SALVSYNC_handler_lock);
822 ReleaseReadLock(&SALVSYNC_handler_lock);
825 static int
826 AddHandler(osi_socket afd, void (*aproc) (int))
828 int i;
829 ObtainWriteLock(&SALVSYNC_handler_lock);
830 for (i = 0; i < MAXHANDLERS; i++)
831 if (HandlerFD[i] == OSI_NULLSOCKET)
832 break;
833 if (i >= MAXHANDLERS) {
834 ReleaseWriteLock(&SALVSYNC_handler_lock);
835 return 0;
837 HandlerFD[i] = afd;
838 HandlerProc[i] = aproc;
839 ReleaseWriteLock(&SALVSYNC_handler_lock);
840 return 1;
843 static int
844 FindHandler(osi_socket afd)
846 int i;
847 ObtainReadLock(&SALVSYNC_handler_lock);
848 for (i = 0; i < MAXHANDLERS; i++)
849 if (HandlerFD[i] == afd) {
850 ReleaseReadLock(&SALVSYNC_handler_lock);
851 return i;
853 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
854 osi_Panic("Failed to find handler\n");
855 return -1; /* satisfy compiler */
858 static int
859 FindHandler_r(osi_socket afd)
861 int i;
862 for (i = 0; i < MAXHANDLERS; i++)
863 if (HandlerFD[i] == afd) {
864 return i;
866 osi_Panic("Failed to find handler\n");
867 return -1; /* satisfy compiler */
870 static int
871 RemoveHandler(osi_socket afd)
873 ObtainWriteLock(&SALVSYNC_handler_lock);
874 HandlerFD[FindHandler_r(afd)] = OSI_NULLSOCKET;
875 ReleaseWriteLock(&SALVSYNC_handler_lock);
876 return 1;
879 static void
880 GetHandler(fd_set * fdsetp, int *maxfdp)
882 int i;
883 int maxfd = -1;
884 FD_ZERO(fdsetp);
885 ObtainReadLock(&SALVSYNC_handler_lock); /* just in case */
886 for (i = 0; i < MAXHANDLERS; i++)
887 if (HandlerFD[i] != OSI_NULLSOCKET) {
888 FD_SET(HandlerFD[i], fdsetp);
889 #ifndef AFS_NT40_ENV
890 /* On Windows the nfds parameter to select() is ignored */
891 if (maxfd < HandlerFD[i] || maxfd == (int)-1)
892 maxfd = HandlerFD[i];
893 #endif
895 *maxfdp = maxfd;
896 ReleaseReadLock(&SALVSYNC_handler_lock); /* just in case */
900 * allocate a salvage queue node.
902 * @param[out] node_out address in which to store new node pointer
904 * @return operation status
905 * @retval 0 success
906 * @retval 1 failed to allocate node
908 * @internal
910 static int
911 AllocNode(struct SalvageQueueNode ** node_out)
913 int code = 0;
914 struct SalvageQueueNode * node;
916 *node_out = node = (struct SalvageQueueNode *)
917 malloc(sizeof(struct SalvageQueueNode));
918 if (node == NULL) {
919 code = 1;
920 goto done;
923 memset(node, 0, sizeof(struct SalvageQueueNode));
924 node->type = SALVSYNC_VOLGROUP_PARENT;
925 node->state = SALVSYNC_STATE_UNKNOWN;
927 done:
928 return code;
932 * link a salvage queue node to its parent.
934 * @param[in] parent pointer to queue node for parent of volume group
935 * @param[in] clone pointer to queue node for a clone
937 * @return operation status
938 * @retval 0 success
939 * @retval 1 failure
941 * @internal
943 static int
944 LinkNode(struct SalvageQueueNode * parent,
945 struct SalvageQueueNode * clone)
947 int code = 0;
948 int idx;
950 /* check for attaching a clone to a clone */
951 if (parent->type != SALVSYNC_VOLGROUP_PARENT) {
952 code = 1;
953 goto done;
956 /* check for pre-existing registration and openings */
957 for (idx = 0; idx < VOLMAXTYPES; idx++) {
958 if (parent->volgroup.children[idx] == clone) {
959 goto linked;
961 if (parent->volgroup.children[idx] == NULL) {
962 break;
965 if (idx == VOLMAXTYPES) {
966 code = 1;
967 goto done;
970 /* link parent and child */
971 parent->volgroup.children[idx] = clone;
972 clone->type = SALVSYNC_VOLGROUP_CLONE;
973 clone->volgroup.parent = parent;
976 linked:
977 switch (clone->state) {
978 case SALVSYNC_STATE_QUEUED:
979 DeleteFromSalvageQueue(clone);
981 case SALVSYNC_STATE_SALVAGING:
982 switch (parent->state) {
983 case SALVSYNC_STATE_UNKNOWN:
984 case SALVSYNC_STATE_ERROR:
985 case SALVSYNC_STATE_DONE:
986 parent->command.sop.prio = clone->command.sop.prio;
987 AddToSalvageQueue(parent);
988 break;
990 case SALVSYNC_STATE_QUEUED:
991 if (clone->command.sop.prio) {
992 parent->command.sop.prio += clone->command.sop.prio;
993 UpdateCommandPrio(parent);
995 break;
997 default:
998 break;
1000 break;
1002 default:
1003 break;
1006 done:
1007 return code;
1010 static void
1011 HandlePrio(struct SalvageQueueNode * clone,
1012 struct SalvageQueueNode * node,
1013 afs_uint32 new_prio)
1015 afs_uint32 delta;
1017 switch (node->state) {
1018 case SALVSYNC_STATE_ERROR:
1019 case SALVSYNC_STATE_DONE:
1020 case SALVSYNC_STATE_UNKNOWN:
1021 node->command.sop.prio = 0;
1022 break;
1023 default:
1024 break;
1027 if (new_prio < clone->command.sop.prio) {
1028 /* strange. let's just set our delta to 1 */
1029 delta = 1;
1030 } else {
1031 delta = new_prio - clone->command.sop.prio;
1034 if (clone->type == SALVSYNC_VOLGROUP_CLONE) {
1035 clone->command.sop.prio = new_prio;
1038 node->command.sop.prio += delta;
1041 static int
1042 AddToSalvageQueue(struct SalvageQueueNode * node)
1044 afs_int32 id;
1045 struct SalvageQueueNode * last = NULL;
1047 id = volutil_GetPartitionID(node->command.sop.partName);
1048 if (id < 0 || id > VOLMAXPARTS) {
1049 return 1;
1051 if (!VGetPartitionById_r(id, 0)) {
1052 /* don't enqueue salvage requests for unmounted partitions */
1053 return 1;
1055 if (queue_IsOnQueue(node)) {
1056 return 0;
1059 if (queue_IsNotEmpty(&salvageQueue.part[id])) {
1060 last = queue_Last(&salvageQueue.part[id], SalvageQueueNode);
1062 queue_Append(&salvageQueue.part[id], node);
1063 salvageQueue.len[id]++;
1064 salvageQueue.total_len++;
1065 salvageQueue.last_insert = id;
1066 node->partition_id = id;
1067 node->state = SALVSYNC_STATE_QUEUED;
1069 /* reorder, if necessary */
1070 if (last && last->command.sop.prio < node->command.sop.prio) {
1071 UpdateCommandPrio(node);
1074 CV_BROADCAST(&salvageQueue.cv);
1075 return 0;
1078 static void
1079 DeleteFromSalvageQueue(struct SalvageQueueNode * node)
1081 if (queue_IsOnQueue(node)) {
1082 queue_Remove(node);
1083 salvageQueue.len[node->partition_id]--;
1084 salvageQueue.total_len--;
1085 node->state = SALVSYNC_STATE_UNKNOWN;
1086 CV_BROADCAST(&salvageQueue.cv);
1090 static void
1091 AddToPendingQueue(struct SalvageQueueNode * node)
1093 queue_Append(&pendingQueue, node);
1094 pendingQueue.len++;
1095 node->state = SALVSYNC_STATE_SALVAGING;
1096 CV_BROADCAST(&pendingQueue.queue_change_cv);
1099 static void
1100 DeleteFromPendingQueue(struct SalvageQueueNode * node)
1102 if (queue_IsOnQueue(node)) {
1103 queue_Remove(node);
1104 pendingQueue.len--;
1105 node->state = SALVSYNC_STATE_UNKNOWN;
1106 CV_BROADCAST(&pendingQueue.queue_change_cv);
1110 #if 0
1111 static struct SalvageQueueNode *
1112 LookupPendingCommand(SALVSYNC_command_hdr * qry)
1114 struct SalvageQueueNode * np, * nnp;
1116 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1117 if ((np->command.sop.volume == qry->volume) &&
1118 !strncmp(np->command.sop.partName, qry->partName,
1119 sizeof(qry->partName)))
1120 break;
1123 if (queue_IsEnd(&pendingQueue, np))
1124 np = NULL;
1125 return np;
1127 #endif
1129 static struct SalvageQueueNode *
1130 LookupPendingCommandByPid(int pid)
1132 struct SalvageQueueNode * np, * nnp;
1134 for (queue_Scan(&pendingQueue, np, nnp, SalvageQueueNode)) {
1135 if (np->pid == pid)
1136 break;
1139 if (queue_IsEnd(&pendingQueue, np))
1140 np = NULL;
1141 return np;
1145 /* raise the priority of a previously scheduled salvage */
1146 static void
1147 UpdateCommandPrio(struct SalvageQueueNode * node)
1149 struct SalvageQueueNode *np, *nnp;
1150 afs_int32 id;
1151 afs_uint32 prio;
1153 osi_Assert(queue_IsOnQueue(node));
1155 prio = node->command.sop.prio;
1156 id = node->partition_id;
1157 if (queue_First(&salvageQueue.part[id], SalvageQueueNode)->command.sop.prio < prio) {
1158 queue_Remove(node);
1159 queue_Prepend(&salvageQueue.part[id], node);
1160 } else {
1161 for (queue_ScanBackwardsFrom(&salvageQueue.part[id], node, np, nnp, SalvageQueueNode)) {
1162 if (np->command.sop.prio > prio)
1163 break;
1165 if (queue_IsEnd(&salvageQueue.part[id], np)) {
1166 queue_Remove(node);
1167 queue_Prepend(&salvageQueue.part[id], node);
1168 } else if (node != np) {
1169 queue_Remove(node);
1170 queue_InsertAfter(np, node);
1175 /* this will need to be rearchitected if we ever want more than one thread
1176 * to wait for new salvage nodes */
1177 struct SalvageQueueNode *
1178 SALVSYNC_getWork(void)
1180 int i;
1181 struct DiskPartition64 * dp = NULL, * fdp;
1182 static afs_int32 next_part_sched = 0;
1183 struct SalvageQueueNode *node = NULL;
1185 VOL_LOCK;
1188 * wait for work to be scheduled
1189 * if there are no disk partitions, just sit in this wait loop forever
1191 while (!salvageQueue.total_len || !DiskPartitionList) {
1192 VOL_CV_WAIT(&salvageQueue.cv);
1196 * short circuit for simple case where only one partition has
1197 * scheduled salvages
1199 if (salvageQueue.last_insert >= 0 && salvageQueue.last_insert <= VOLMAXPARTS &&
1200 (salvageQueue.total_len == salvageQueue.len[salvageQueue.last_insert])) {
1201 node = queue_First(&salvageQueue.part[salvageQueue.last_insert], SalvageQueueNode);
1202 goto have_node;
1207 * ok, more than one partition has scheduled salvages.
1208 * now search for partitions with scheduled salvages, but no pending salvages.
1210 dp = VGetPartitionById_r(next_part_sched, 0);
1211 if (!dp) {
1212 dp = DiskPartitionList;
1214 fdp = dp;
1216 for (i=0 ;
1217 !i || dp != fdp ;
1218 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1219 if (!partition_salvaging[dp->index] && salvageQueue.len[dp->index]) {
1220 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1221 goto have_node;
1227 * all partitions with scheduled salvages have at least one pending.
1228 * now do an exhaustive search for a scheduled salvage.
1230 dp = fdp;
1232 for (i=0 ;
1233 !i || dp != fdp ;
1234 dp = (dp->next) ? dp->next : DiskPartitionList, i++ ) {
1235 if (salvageQueue.len[dp->index]) {
1236 node = queue_First(&salvageQueue.part[dp->index], SalvageQueueNode);
1237 goto have_node;
1241 /* we should never reach this line */
1242 osi_Panic("Node not found\n");
1244 have_node:
1245 osi_Assert(node != NULL);
1246 node->pid = 0;
1247 partition_salvaging[node->partition_id]++;
1248 DeleteFromSalvageQueue(node);
1249 AddToPendingQueue(node);
1251 if (dp) {
1252 /* update next_part_sched field */
1253 if (dp->next) {
1254 next_part_sched = dp->next->index;
1255 } else if (DiskPartitionList) {
1256 next_part_sched = DiskPartitionList->index;
1257 } else {
1258 next_part_sched = -1;
1262 VOL_UNLOCK;
1263 return node;
1267 * update internal scheduler state to reflect completion of a work unit.
1269 * @param[in] node salvage queue node object pointer
1270 * @param[in] result worker process result code
1272 * @post scheduler state is updated.
1274 * @internal
1276 static void
1277 SALVSYNC_doneWork_r(struct SalvageQueueNode * node, int result)
1279 afs_int32 partid;
1280 int idx;
1282 DeleteFromPendingQueue(node);
1283 partid = node->partition_id;
1284 if (partid >=0 && partid <= VOLMAXPARTS) {
1285 partition_salvaging[partid]--;
1287 if (result == 0) {
1288 node->state = SALVSYNC_STATE_DONE;
1289 } else if (result != SALSRV_EXIT_VOLGROUP_LINK) {
1290 node->state = SALVSYNC_STATE_ERROR;
1293 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1294 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1295 if (node->volgroup.children[idx]) {
1296 node->volgroup.children[idx]->state = node->state;
1303 * check whether worker child failed.
1305 * @param[in] status status bitfield return by wait()
1307 * @return boolean failure code
1308 * @retval 0 child succeeded
1309 * @retval 1 child failed
1311 * @internal
1313 static int
1314 ChildFailed(int status)
1316 return (WCOREDUMP(status) ||
1317 WIFSIGNALED(status) ||
1318 ((WEXITSTATUS(status) != 0) &&
1319 (WEXITSTATUS(status) != SALSRV_EXIT_VOLGROUP_LINK)));
1324 * notify salvsync scheduler of node completion, by child pid.
1326 * @param[in] pid pid of worker child
1327 * @param[in] status worker status bitfield from wait()
1329 * @post scheduler state is updated.
1330 * if status code is a failure, fileserver notification was attempted
1332 * @see SALVSYNC_doneWork_r
1334 void
1335 SALVSYNC_doneWorkByPid(int pid, int status)
1337 struct SalvageQueueNode * node;
1338 char partName[16];
1339 afs_uint32 volids[VOLMAXTYPES+1];
1340 unsigned int idx;
1342 memset(volids, 0, sizeof(volids));
1344 VOL_LOCK;
1345 node = LookupPendingCommandByPid(pid);
1346 if (node != NULL) {
1347 SALVSYNC_doneWork_r(node, status);
1349 if (ChildFailed(status)) {
1350 /* populate volume id list for later processing outside the glock */
1351 volids[0] = node->command.sop.volume;
1352 strcpy(partName, node->command.sop.partName);
1353 if (node->type == SALVSYNC_VOLGROUP_PARENT) {
1354 for (idx = 0; idx < VOLMAXTYPES; idx++) {
1355 if (node->volgroup.children[idx]) {
1356 volids[idx+1] = node->volgroup.children[idx]->command.sop.volume;
1362 VOL_UNLOCK;
1365 * if necessary, notify fileserver of
1366 * failure to salvage volume group
1367 * [we cannot guarantee that the child made the
1368 * appropriate notifications (e.g. SIGSEGV)]
1369 * -- tkeiser 11/28/2007
1371 if (ChildFailed(status)) {
1372 for (idx = 0; idx <= VOLMAXTYPES; idx++) {
1373 if (volids[idx]) {
1374 FSYNC_VolOp(volids[idx],
1375 partName,
1376 FSYNC_VOL_FORCE_ERROR,
1377 FSYNC_WHATEVER,
1378 NULL);
1384 #endif /* AFS_DEMAND_ATTACH_FS */