2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
30 #include <sys/types.h>
36 #include <sys/param.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
43 #include <afs/afs_assert.h>
49 #include <afs/afsint.h>
51 #include <afs/errors.h>
55 #include <afs/afssyscalls.h>
59 #include "partition.h"
61 #include <rx/rx_queue.h>
62 #include <afs/procmgmt.h>
64 #if !defined(offsetof)
68 #ifdef USE_UNIX_SOCKETS
69 #include <afs/afsutil.h>
74 #define WCOREDUMP(x) ((x) & 0200)
77 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
78 * move = dump+restore can run on single server */
82 * This lock controls access to the handler array.
84 struct Lock SALVSYNC_handler_lock
;
87 #ifdef AFS_DEMAND_ATTACH_FS
89 * SALVSYNC is a feature specific to the demand attach fileserver
92 /* Forward declarations */
93 static void * SALVSYNC_syncThread(void *);
94 static void SALVSYNC_newconnection(osi_socket fd
);
95 static void SALVSYNC_com(osi_socket fd
);
96 static void SALVSYNC_Drop(osi_socket fd
);
97 static void AcceptOn(void);
98 static void AcceptOff(void);
99 static void InitHandler(void);
100 static void CallHandler(fd_set
* fdsetp
);
101 static int AddHandler(osi_socket afd
, void (*aproc
) (int));
102 static int FindHandler(osi_socket afd
);
103 static int FindHandler_r(osi_socket afd
);
104 static int RemoveHandler(osi_socket afd
);
105 static void GetHandler(fd_set
* fdsetp
, int *maxfdp
);
107 static int AllocNode(struct SalvageQueueNode
** node
);
109 static int AddToSalvageQueue(struct SalvageQueueNode
* node
);
110 static void DeleteFromSalvageQueue(struct SalvageQueueNode
* node
);
111 static void AddToPendingQueue(struct SalvageQueueNode
* node
);
112 static void DeleteFromPendingQueue(struct SalvageQueueNode
* node
);
113 static struct SalvageQueueNode
* LookupPendingCommandByPid(int pid
);
114 static void UpdateCommandPrio(struct SalvageQueueNode
* node
);
115 static void HandlePrio(struct SalvageQueueNode
* clone
,
116 struct SalvageQueueNode
* parent
,
117 afs_uint32 new_prio
);
119 static int LinkNode(struct SalvageQueueNode
* parent
,
120 struct SalvageQueueNode
* clone
);
122 static struct SalvageQueueNode
* LookupNode(VolumeId vid
, char * partName
,
123 struct SalvageQueueNode
** parent
);
124 static struct SalvageQueueNode
* LookupNodeByCommand(SALVSYNC_command_hdr
* qry
,
125 struct SalvageQueueNode
** parent
);
126 static void AddNodeToHash(struct SalvageQueueNode
* node
);
128 static afs_int32
SALVSYNC_com_Salvage(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
129 static afs_int32
SALVSYNC_com_Cancel(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
130 static afs_int32
SALVSYNC_com_Query(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
131 static afs_int32
SALVSYNC_com_CancelAll(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
132 static afs_int32
SALVSYNC_com_Link(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
137 extern pthread_mutex_t vol_salvsync_mutex
;
140 * salvsync server socket handle.
142 static SYNC_server_state_t salvsync_server_state
=
143 { OSI_NULLSOCKET
, /* file descriptor */
144 SALVSYNC_ENDPOINT_DECL
, /* server endpoint */
145 SALVSYNC_PROTO_VERSION
, /* protocol version */
146 5, /* bind() retry limit */
147 100, /* listen() queue depth */
148 "SALVSYNC", /* protocol name string */
153 * queue of all volumes waiting to be salvaged.
155 struct SalvageQueue
{
156 volatile int total_len
;
157 volatile afs_int32 last_insert
; /**< id of last partition to have a salvage node inserted */
158 volatile int len
[VOLMAXPARTS
+1];
159 volatile struct rx_queue part
[VOLMAXPARTS
+1]; /**< per-partition queues of pending salvages */
162 static struct SalvageQueue salvageQueue
; /* volumes waiting to be salvaged */
165 * queue of all volumes currently being salvaged.
168 volatile struct rx_queue q
; /**< queue of salvages in progress */
169 volatile int len
; /**< length of in-progress queue */
170 pthread_cond_t queue_change_cv
;
172 static struct QueueHead pendingQueue
; /* volumes being salvaged */
175 * whether a partition has a salvage in progress
177 * the salvager code only permits one salvage per partition at a time
179 * the following hack tries to keep salvaged parallelism high by
180 * only permitting one salvage dispatch per partition at a time
182 * unfortunately, the parallel salvager currently
183 * has a rather braindead routine that won't permit
184 * multiple salvages on the same "device". this
185 * function happens to break pretty badly on lvm, raid luns, etc.
187 * this hack isn't good enough to stop the device limiting code from
188 * crippling performance. someday that code needs to be rewritten
190 static int partition_salvaging
[VOLMAXPARTS
+1];
192 static int HandlerFD
[MAXHANDLERS
];
193 static void (*HandlerProc
[MAXHANDLERS
]) (int);
195 #define VSHASH_SIZE 64
196 #define VSHASH_MASK (VSHASH_SIZE-1)
197 #define VSHASH(vid) ((vid)&VSHASH_MASK)
199 static struct QueueHead SalvageHashTable
[VSHASH_SIZE
];
201 static struct SalvageQueueNode
*
202 LookupNode(afs_uint32 vid
, char * partName
,
203 struct SalvageQueueNode
** parent
)
205 struct rx_queue
*qp
, *nqp
;
206 struct SalvageQueueNode
*vsp
;
207 int idx
= VSHASH(vid
);
209 for (queue_Scan(&SalvageHashTable
[idx
], qp
, nqp
, rx_queue
)) {
210 vsp
= (struct SalvageQueueNode
*)((char *)qp
- offsetof(struct SalvageQueueNode
, hash_chain
));
211 if ((vsp
->command
.sop
.volume
== vid
) &&
212 !strncmp(vsp
->command
.sop
.partName
, partName
, sizeof(vsp
->command
.sop
.partName
))) {
217 if (queue_IsEnd(&SalvageHashTable
[idx
], qp
)) {
223 *parent
= (vsp
->type
== SALVSYNC_VOLGROUP_CLONE
) ?
224 vsp
->volgroup
.parent
: vsp
;
233 static struct SalvageQueueNode
*
234 LookupNodeByCommand(SALVSYNC_command_hdr
* qry
,
235 struct SalvageQueueNode
** parent
)
237 return LookupNode(qry
->volume
, qry
->partName
, parent
);
241 AddNodeToHash(struct SalvageQueueNode
* node
)
243 int idx
= VSHASH(node
->command
.sop
.volume
);
245 if (queue_IsOnQueue(&node
->hash_chain
)) {
249 queue_Append(&SalvageHashTable
[idx
], &node
->hash_chain
);
250 SalvageHashTable
[idx
].len
++;
255 DeleteNodeFromHash(struct SalvageQueueNode
* node
)
257 int idx
= VSHASH(node
->command
.sop
.volume
);
259 if (queue_IsNotOnQueue(&node
->hash_chain
)) {
263 queue_Remove(&node
->hash_chain
);
264 SalvageHashTable
[idx
].len
--;
269 SALVSYNC_salvInit(void)
273 pthread_attr_t tattr
;
275 /* initialize the queues */
276 Lock_Init(&SALVSYNC_handler_lock
);
277 CV_INIT(&salvageQueue
.cv
, "sq", CV_DEFAULT
, 0);
278 for (i
= 0; i
<= VOLMAXPARTS
; i
++) {
279 queue_Init(&salvageQueue
.part
[i
]);
280 salvageQueue
.len
[i
] = 0;
282 CV_INIT(&pendingQueue
.queue_change_cv
, "queuechange", CV_DEFAULT
, 0);
283 queue_Init(&pendingQueue
);
284 salvageQueue
.total_len
= pendingQueue
.len
= 0;
285 salvageQueue
.last_insert
= -1;
286 memset(partition_salvaging
, 0, sizeof(partition_salvaging
));
288 for (i
= 0; i
< VSHASH_SIZE
; i
++) {
289 CV_INIT(&SalvageHashTable
[i
].queue_change_cv
, "queuechange", CV_DEFAULT
, 0);
290 SalvageHashTable
[i
].len
= 0;
291 queue_Init(&SalvageHashTable
[i
]);
294 /* start the salvsync thread */
295 osi_Assert(pthread_attr_init(&tattr
) == 0);
296 osi_Assert(pthread_attr_setdetachstate(&tattr
, PTHREAD_CREATE_DETACHED
) == 0);
297 osi_Assert(pthread_create(&tid
, &tattr
, SALVSYNC_syncThread
, NULL
) == 0);
304 for (i
= 0; i
< MAXHANDLERS
; ++i
) {
305 if (HandlerFD
[i
] >= 0) {
306 SALVSYNC_Drop(HandlerFD
[i
]);
310 /* just in case we were in AcceptOff mode, and thus this fd wouldn't
312 close(salvsync_server_state
.fd
);
313 salvsync_server_state
.fd
= OSI_NULLSOCKET
;
316 static fd_set SALVSYNC_readfds
;
319 SALVSYNC_syncThread(void * args
)
322 SYNC_server_state_t
* state
= &salvsync_server_state
;
324 /* when we fork, the child needs to close the salvsync server sockets,
325 * otherwise, it may get salvsync requests, instead of the parent
327 osi_Assert(pthread_atfork(NULL
, NULL
, CleanFDs
) == 0);
329 SYNC_getAddr(&state
->endpoint
, &state
->addr
);
330 SYNC_cleanupSock(state
);
333 (void)signal(SIGPIPE
, SIG_IGN
);
336 state
->fd
= SYNC_getSock(&state
->endpoint
);
337 code
= SYNC_bindSock(state
);
345 struct timeval s_timeout
;
346 GetHandler(&SALVSYNC_readfds
, &maxfd
);
347 s_timeout
.tv_sec
= SYNC_SELECT_TIMEOUT
;
348 s_timeout
.tv_usec
= 0;
349 /* Note: check for >= 1 below is essential since IOMGR_select
350 * doesn't have exactly same semantics as select.
352 if (select(maxfd
+ 1, &SALVSYNC_readfds
, NULL
, NULL
, &s_timeout
) >= 1)
353 CallHandler(&SALVSYNC_readfds
);
360 SALVSYNC_newconnection(int afd
)
362 #ifdef USE_UNIX_SOCKETS
363 struct sockaddr_un other
;
364 #else /* USE_UNIX_SOCKETS */
365 struct sockaddr_in other
;
370 junk
= sizeof(other
);
371 fd
= accept(afd
, (struct sockaddr
*)&other
, &junk
);
372 if (fd
== OSI_NULLSOCKET
) {
373 osi_Panic("SALVSYNC_newconnection: accept failed, errno==%d\n", errno
);
374 } else if (!AddHandler(fd
, SALVSYNC_com
)) {
376 osi_Assert(AddHandler(fd
, SALVSYNC_com
));
380 /* this function processes commands from an salvsync file descriptor (fd) */
381 static afs_int32 SALV_cnt
= 0;
383 SALVSYNC_com(osi_socket fd
)
387 SALVSYNC_response_hdr sres_hdr
;
388 SALVSYNC_command scom
;
389 SALVSYNC_response sres
;
390 SYNC_PROTO_BUF_DECL(buf
);
392 memset(&com
, 0, sizeof(com
));
393 memset(&res
, 0, sizeof(res
));
394 memset(&scom
, 0, sizeof(scom
));
395 memset(&sres
, 0, sizeof(sres
));
396 memset(&sres_hdr
, 0, sizeof(sres_hdr
));
398 com
.payload
.buf
= (void *)buf
;
399 com
.payload
.len
= SYNC_PROTO_MAX_LEN
;
400 res
.payload
.buf
= (void *) &sres_hdr
;
401 res
.payload
.len
= sizeof(sres_hdr
);
402 res
.hdr
.response_len
= sizeof(res
.hdr
) + sizeof(sres_hdr
);
403 res
.hdr
.proto_version
= SALVSYNC_PROTO_VERSION
;
406 scom
.sop
= (SALVSYNC_command_hdr
*) buf
;
409 sres
.sop
= &sres_hdr
;
413 if (SYNC_getCom(&salvsync_server_state
, fd
, &com
)) {
414 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt
);
419 if (com
.recv_len
< sizeof(com
.hdr
)) {
420 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com
.recv_len
);
421 res
.hdr
.response
= SYNC_COM_ERROR
;
422 res
.hdr
.reason
= SYNC_REASON_MALFORMED_PACKET
;
423 res
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
427 if (com
.hdr
.proto_version
!= SALVSYNC_PROTO_VERSION
) {
428 Log("SALVSYNC_com: invalid protocol version (%u)\n", com
.hdr
.proto_version
);
429 res
.hdr
.response
= SYNC_COM_ERROR
;
430 res
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
434 if (com
.hdr
.command
== SYNC_COM_CHANNEL_CLOSE
) {
435 res
.hdr
.response
= SYNC_OK
;
436 res
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
438 /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
439 * never wait for a response. */
443 if (com
.recv_len
!= (sizeof(com
.hdr
) + sizeof(SALVSYNC_command_hdr
))) {
444 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com
.recv_len
);
445 res
.hdr
.response
= SYNC_COM_ERROR
;
446 res
.hdr
.reason
= SYNC_REASON_MALFORMED_PACKET
;
447 res
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
451 res
.hdr
.com_seq
= com
.hdr
.com_seq
;
454 switch (com
.hdr
.command
) {
457 case SALVSYNC_SALVAGE
:
458 case SALVSYNC_RAISEPRIO
:
459 res
.hdr
.response
= SALVSYNC_com_Salvage(&scom
, &sres
);
461 case SALVSYNC_CANCEL
:
462 /* cancel a salvage */
463 res
.hdr
.response
= SALVSYNC_com_Cancel(&scom
, &sres
);
465 case SALVSYNC_CANCELALL
:
466 /* cancel all queued salvages */
467 res
.hdr
.response
= SALVSYNC_com_CancelAll(&scom
, &sres
);
470 /* query whether a volume is done salvaging */
471 res
.hdr
.response
= SALVSYNC_com_Query(&scom
, &sres
);
473 case SALVSYNC_OP_LINK
:
474 /* link a clone to its parent in the scheduler */
475 res
.hdr
.response
= SALVSYNC_com_Link(&scom
, &sres
);
478 res
.hdr
.response
= SYNC_BAD_COMMAND
;
482 sres_hdr
.sq_len
= salvageQueue
.total_len
;
483 sres_hdr
.pq_len
= pendingQueue
.len
;
487 SYNC_putRes(&salvsync_server_state
, fd
, &res
);
490 if (res
.hdr
.flags
& SYNC_FLAG_CHANNEL_SHUTDOWN
) {
496 * request that a volume be salvaged.
498 * @param[in] com inbound command object
499 * @param[out] res outbound response object
501 * @return operation status
502 * @retval SYNC_OK success
503 * @retval SYNC_DENIED failed to enqueue request
504 * @retval SYNC_FAILED malformed command packet
506 * @note this is a SALVSYNC protocol rpc handler
510 * @post the volume is enqueued in the to-be-salvaged queue.
511 * if the volume was already in the salvage queue, its
512 * priority (and thus its location in the queue) are
516 SALVSYNC_com_Salvage(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
518 afs_int32 code
= SYNC_OK
;
519 struct SalvageQueueNode
* node
, * clone
;
522 if (SYNC_verifyProtocolString(com
->sop
->partName
, sizeof(com
->sop
->partName
))) {
524 res
->hdr
->reason
= SYNC_REASON_MALFORMED_PACKET
;
528 clone
= LookupNodeByCommand(com
->sop
, &node
);
531 if (AllocNode(&node
)) {
533 res
->hdr
->reason
= SYNC_REASON_NOMEM
;
540 HandlePrio(clone
, node
, com
->sop
->prio
);
542 switch (node
->state
) {
543 case SALVSYNC_STATE_QUEUED
:
544 UpdateCommandPrio(node
);
547 case SALVSYNC_STATE_ERROR
:
548 case SALVSYNC_STATE_DONE
:
549 case SALVSYNC_STATE_UNKNOWN
:
550 memcpy(&clone
->command
.com
, com
->hdr
, sizeof(SYNC_command_hdr
));
551 memcpy(&clone
->command
.sop
, com
->sop
, sizeof(SALVSYNC_command_hdr
));
554 * make sure volgroup parent partition path is kept coherent
556 * If we ever want to support non-COW clones on a machine holding
557 * the RW site, please note that this code does not work under the
558 * conditions where someone zaps a COW clone on partition X, and
559 * subsequently creates a full clone on partition Y -- we'd need
560 * an inverse to SALVSYNC_com_Link.
561 * -- tkeiser 11/28/2007
563 strcpy(node
->command
.sop
.partName
, com
->sop
->partName
);
565 if (AddToSalvageQueue(node
)) {
578 res
->hdr
->flags
|= SALVSYNC_FLAG_VOL_STATS_VALID
;
579 res
->sop
->state
= node
->state
;
580 res
->sop
->prio
= node
->command
.sop
.prio
;
587 * cancel a pending salvage request.
589 * @param[in] com inbound command object
590 * @param[out] res outbound response object
592 * @return operation status
593 * @retval SYNC_OK success
594 * @retval SYNC_FAILED malformed command packet
596 * @note this is a SALVSYNC protocol rpc handler
601 SALVSYNC_com_Cancel(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
603 afs_int32 code
= SYNC_OK
;
604 struct SalvageQueueNode
* node
;
606 if (SYNC_verifyProtocolString(com
->sop
->partName
, sizeof(com
->sop
->partName
))) {
608 res
->hdr
->reason
= SYNC_REASON_MALFORMED_PACKET
;
612 node
= LookupNodeByCommand(com
->sop
, NULL
);
615 res
->sop
->state
= SALVSYNC_STATE_UNKNOWN
;
618 res
->hdr
->flags
|= SALVSYNC_FLAG_VOL_STATS_VALID
;
619 res
->sop
->prio
= node
->command
.sop
.prio
;
620 res
->sop
->state
= node
->state
;
621 if ((node
->type
== SALVSYNC_VOLGROUP_PARENT
) &&
622 (node
->state
== SALVSYNC_STATE_QUEUED
)) {
623 DeleteFromSalvageQueue(node
);
632 * cancel all pending salvage requests.
634 * @param[in] com incoming command object
635 * @param[out] res outbound response object
637 * @return operation status
638 * @retval SYNC_OK success
640 * @note this is a SALVSYNC protocol rpc handler
645 SALVSYNC_com_CancelAll(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
647 struct SalvageQueueNode
* np
, *nnp
;
648 struct DiskPartition64
* dp
;
650 for (dp
= DiskPartitionList
; dp
; dp
= dp
->next
) {
651 for (queue_Scan(&salvageQueue
.part
[dp
->index
], np
, nnp
, SalvageQueueNode
)) {
652 DeleteFromSalvageQueue(np
);
660 * link a queue node for a clone to its parent volume.
662 * @param[in] com inbound command object
663 * @param[out] res outbound response object
665 * @return operation status
666 * @retval SYNC_OK success
667 * @retval SYNC_FAILED malformed command packet
668 * @retval SYNC_DENIED the request could not be completed
670 * @note this is a SALVSYNC protocol rpc handler
672 * @post the requested volume is marked as a child of another volume.
673 * thus, future salvage requests for this volume will result in the
674 * parent of the volume group being scheduled for salvage instead
680 SALVSYNC_com_Link(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
682 afs_int32 code
= SYNC_OK
;
683 struct SalvageQueueNode
* clone
, * parent
;
685 if (SYNC_verifyProtocolString(com
->sop
->partName
, sizeof(com
->sop
->partName
))) {
687 res
->hdr
->reason
= SYNC_REASON_MALFORMED_PACKET
;
691 /* lookup clone's salvage scheduling node */
692 clone
= LookupNodeByCommand(com
->sop
, NULL
);
695 res
->hdr
->reason
= SALVSYNC_REASON_ERROR
;
699 /* lookup parent's salvage scheduling node */
700 parent
= LookupNode(com
->sop
->parent
, com
->sop
->partName
, NULL
);
701 if (parent
== NULL
) {
702 if (AllocNode(&parent
)) {
704 res
->hdr
->reason
= SYNC_REASON_NOMEM
;
707 memcpy(&parent
->command
.com
, com
->hdr
, sizeof(SYNC_command_hdr
));
708 memcpy(&parent
->command
.sop
, com
->sop
, sizeof(SALVSYNC_command_hdr
));
709 parent
->command
.sop
.volume
= parent
->command
.sop
.parent
= com
->sop
->parent
;
710 AddNodeToHash(parent
);
713 if (LinkNode(parent
, clone
)) {
723 * query the status of a volume salvage request.
725 * @param[in] com inbound command object
726 * @param[out] res outbound response object
728 * @return operation status
729 * @retval SYNC_OK success
730 * @retval SYNC_FAILED malformed command packet
732 * @note this is a SALVSYNC protocol rpc handler
737 SALVSYNC_com_Query(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
739 afs_int32 code
= SYNC_OK
;
740 struct SalvageQueueNode
* node
;
742 if (SYNC_verifyProtocolString(com
->sop
->partName
, sizeof(com
->sop
->partName
))) {
744 res
->hdr
->reason
= SYNC_REASON_MALFORMED_PACKET
;
748 LookupNodeByCommand(com
->sop
, &node
);
750 /* query whether a volume is done salvaging */
752 res
->sop
->state
= SALVSYNC_STATE_UNKNOWN
;
755 res
->hdr
->flags
|= SALVSYNC_FLAG_VOL_STATS_VALID
;
756 res
->sop
->state
= node
->state
;
757 res
->sop
->prio
= node
->command
.sop
.prio
;
765 SALVSYNC_Drop(osi_socket fd
)
776 static int AcceptHandler
= -1; /* handler id for accept, if turned on */
781 if (AcceptHandler
== -1) {
782 osi_Assert(AddHandler(salvsync_server_state
.fd
, SALVSYNC_newconnection
));
783 AcceptHandler
= FindHandler(salvsync_server_state
.fd
);
790 if (AcceptHandler
!= -1) {
791 osi_Assert(RemoveHandler(salvsync_server_state
.fd
));
796 /* The multiple FD handling code. */
802 ObtainWriteLock(&SALVSYNC_handler_lock
);
803 for (i
= 0; i
< MAXHANDLERS
; i
++) {
804 HandlerFD
[i
] = OSI_NULLSOCKET
;
805 HandlerProc
[i
] = NULL
;
807 ReleaseWriteLock(&SALVSYNC_handler_lock
);
811 CallHandler(fd_set
* fdsetp
)
814 ObtainReadLock(&SALVSYNC_handler_lock
);
815 for (i
= 0; i
< MAXHANDLERS
; i
++) {
816 if (HandlerFD
[i
] >= 0 && FD_ISSET(HandlerFD
[i
], fdsetp
)) {
817 ReleaseReadLock(&SALVSYNC_handler_lock
);
818 (*HandlerProc
[i
]) (HandlerFD
[i
]);
819 ObtainReadLock(&SALVSYNC_handler_lock
);
822 ReleaseReadLock(&SALVSYNC_handler_lock
);
826 AddHandler(osi_socket afd
, void (*aproc
) (int))
829 ObtainWriteLock(&SALVSYNC_handler_lock
);
830 for (i
= 0; i
< MAXHANDLERS
; i
++)
831 if (HandlerFD
[i
] == OSI_NULLSOCKET
)
833 if (i
>= MAXHANDLERS
) {
834 ReleaseWriteLock(&SALVSYNC_handler_lock
);
838 HandlerProc
[i
] = aproc
;
839 ReleaseWriteLock(&SALVSYNC_handler_lock
);
844 FindHandler(osi_socket afd
)
847 ObtainReadLock(&SALVSYNC_handler_lock
);
848 for (i
= 0; i
< MAXHANDLERS
; i
++)
849 if (HandlerFD
[i
] == afd
) {
850 ReleaseReadLock(&SALVSYNC_handler_lock
);
853 ReleaseReadLock(&SALVSYNC_handler_lock
); /* just in case */
854 osi_Panic("Failed to find handler\n");
855 return -1; /* satisfy compiler */
859 FindHandler_r(osi_socket afd
)
862 for (i
= 0; i
< MAXHANDLERS
; i
++)
863 if (HandlerFD
[i
] == afd
) {
866 osi_Panic("Failed to find handler\n");
867 return -1; /* satisfy compiler */
871 RemoveHandler(osi_socket afd
)
873 ObtainWriteLock(&SALVSYNC_handler_lock
);
874 HandlerFD
[FindHandler_r(afd
)] = OSI_NULLSOCKET
;
875 ReleaseWriteLock(&SALVSYNC_handler_lock
);
880 GetHandler(fd_set
* fdsetp
, int *maxfdp
)
885 ObtainReadLock(&SALVSYNC_handler_lock
); /* just in case */
886 for (i
= 0; i
< MAXHANDLERS
; i
++)
887 if (HandlerFD
[i
] != OSI_NULLSOCKET
) {
888 FD_SET(HandlerFD
[i
], fdsetp
);
890 /* On Windows the nfds parameter to select() is ignored */
891 if (maxfd
< HandlerFD
[i
] || maxfd
== (int)-1)
892 maxfd
= HandlerFD
[i
];
896 ReleaseReadLock(&SALVSYNC_handler_lock
); /* just in case */
900 * allocate a salvage queue node.
902 * @param[out] node_out address in which to store new node pointer
904 * @return operation status
906 * @retval 1 failed to allocate node
911 AllocNode(struct SalvageQueueNode
** node_out
)
914 struct SalvageQueueNode
* node
;
916 *node_out
= node
= (struct SalvageQueueNode
*)
917 malloc(sizeof(struct SalvageQueueNode
));
923 memset(node
, 0, sizeof(struct SalvageQueueNode
));
924 node
->type
= SALVSYNC_VOLGROUP_PARENT
;
925 node
->state
= SALVSYNC_STATE_UNKNOWN
;
932 * link a salvage queue node to its parent.
934 * @param[in] parent pointer to queue node for parent of volume group
935 * @param[in] clone pointer to queue node for a clone
937 * @return operation status
944 LinkNode(struct SalvageQueueNode
* parent
,
945 struct SalvageQueueNode
* clone
)
950 /* check for attaching a clone to a clone */
951 if (parent
->type
!= SALVSYNC_VOLGROUP_PARENT
) {
956 /* check for pre-existing registration and openings */
957 for (idx
= 0; idx
< VOLMAXTYPES
; idx
++) {
958 if (parent
->volgroup
.children
[idx
] == clone
) {
961 if (parent
->volgroup
.children
[idx
] == NULL
) {
965 if (idx
== VOLMAXTYPES
) {
970 /* link parent and child */
971 parent
->volgroup
.children
[idx
] = clone
;
972 clone
->type
= SALVSYNC_VOLGROUP_CLONE
;
973 clone
->volgroup
.parent
= parent
;
977 switch (clone
->state
) {
978 case SALVSYNC_STATE_QUEUED
:
979 DeleteFromSalvageQueue(clone
);
981 case SALVSYNC_STATE_SALVAGING
:
982 switch (parent
->state
) {
983 case SALVSYNC_STATE_UNKNOWN
:
984 case SALVSYNC_STATE_ERROR
:
985 case SALVSYNC_STATE_DONE
:
986 parent
->command
.sop
.prio
= clone
->command
.sop
.prio
;
987 AddToSalvageQueue(parent
);
990 case SALVSYNC_STATE_QUEUED
:
991 if (clone
->command
.sop
.prio
) {
992 parent
->command
.sop
.prio
+= clone
->command
.sop
.prio
;
993 UpdateCommandPrio(parent
);
1011 HandlePrio(struct SalvageQueueNode
* clone
,
1012 struct SalvageQueueNode
* node
,
1013 afs_uint32 new_prio
)
1017 switch (node
->state
) {
1018 case SALVSYNC_STATE_ERROR
:
1019 case SALVSYNC_STATE_DONE
:
1020 case SALVSYNC_STATE_UNKNOWN
:
1021 node
->command
.sop
.prio
= 0;
1027 if (new_prio
< clone
->command
.sop
.prio
) {
1028 /* strange. let's just set our delta to 1 */
1031 delta
= new_prio
- clone
->command
.sop
.prio
;
1034 if (clone
->type
== SALVSYNC_VOLGROUP_CLONE
) {
1035 clone
->command
.sop
.prio
= new_prio
;
1038 node
->command
.sop
.prio
+= delta
;
1042 AddToSalvageQueue(struct SalvageQueueNode
* node
)
1045 struct SalvageQueueNode
* last
= NULL
;
1047 id
= volutil_GetPartitionID(node
->command
.sop
.partName
);
1048 if (id
< 0 || id
> VOLMAXPARTS
) {
1051 if (!VGetPartitionById_r(id
, 0)) {
1052 /* don't enqueue salvage requests for unmounted partitions */
1055 if (queue_IsOnQueue(node
)) {
1059 if (queue_IsNotEmpty(&salvageQueue
.part
[id
])) {
1060 last
= queue_Last(&salvageQueue
.part
[id
], SalvageQueueNode
);
1062 queue_Append(&salvageQueue
.part
[id
], node
);
1063 salvageQueue
.len
[id
]++;
1064 salvageQueue
.total_len
++;
1065 salvageQueue
.last_insert
= id
;
1066 node
->partition_id
= id
;
1067 node
->state
= SALVSYNC_STATE_QUEUED
;
1069 /* reorder, if necessary */
1070 if (last
&& last
->command
.sop
.prio
< node
->command
.sop
.prio
) {
1071 UpdateCommandPrio(node
);
1074 CV_BROADCAST(&salvageQueue
.cv
);
1079 DeleteFromSalvageQueue(struct SalvageQueueNode
* node
)
1081 if (queue_IsOnQueue(node
)) {
1083 salvageQueue
.len
[node
->partition_id
]--;
1084 salvageQueue
.total_len
--;
1085 node
->state
= SALVSYNC_STATE_UNKNOWN
;
1086 CV_BROADCAST(&salvageQueue
.cv
);
1091 AddToPendingQueue(struct SalvageQueueNode
* node
)
1093 queue_Append(&pendingQueue
, node
);
1095 node
->state
= SALVSYNC_STATE_SALVAGING
;
1096 CV_BROADCAST(&pendingQueue
.queue_change_cv
);
1100 DeleteFromPendingQueue(struct SalvageQueueNode
* node
)
1102 if (queue_IsOnQueue(node
)) {
1105 node
->state
= SALVSYNC_STATE_UNKNOWN
;
1106 CV_BROADCAST(&pendingQueue
.queue_change_cv
);
1111 static struct SalvageQueueNode
*
1112 LookupPendingCommand(SALVSYNC_command_hdr
* qry
)
1114 struct SalvageQueueNode
* np
, * nnp
;
1116 for (queue_Scan(&pendingQueue
, np
, nnp
, SalvageQueueNode
)) {
1117 if ((np
->command
.sop
.volume
== qry
->volume
) &&
1118 !strncmp(np
->command
.sop
.partName
, qry
->partName
,
1119 sizeof(qry
->partName
)))
1123 if (queue_IsEnd(&pendingQueue
, np
))
1129 static struct SalvageQueueNode
*
1130 LookupPendingCommandByPid(int pid
)
1132 struct SalvageQueueNode
* np
, * nnp
;
1134 for (queue_Scan(&pendingQueue
, np
, nnp
, SalvageQueueNode
)) {
1139 if (queue_IsEnd(&pendingQueue
, np
))
1145 /* raise the priority of a previously scheduled salvage */
1147 UpdateCommandPrio(struct SalvageQueueNode
* node
)
1149 struct SalvageQueueNode
*np
, *nnp
;
1153 osi_Assert(queue_IsOnQueue(node
));
1155 prio
= node
->command
.sop
.prio
;
1156 id
= node
->partition_id
;
1157 if (queue_First(&salvageQueue
.part
[id
], SalvageQueueNode
)->command
.sop
.prio
< prio
) {
1159 queue_Prepend(&salvageQueue
.part
[id
], node
);
1161 for (queue_ScanBackwardsFrom(&salvageQueue
.part
[id
], node
, np
, nnp
, SalvageQueueNode
)) {
1162 if (np
->command
.sop
.prio
> prio
)
1165 if (queue_IsEnd(&salvageQueue
.part
[id
], np
)) {
1167 queue_Prepend(&salvageQueue
.part
[id
], node
);
1168 } else if (node
!= np
) {
1170 queue_InsertAfter(np
, node
);
1175 /* this will need to be rearchitected if we ever want more than one thread
1176 * to wait for new salvage nodes */
1177 struct SalvageQueueNode
*
1178 SALVSYNC_getWork(void)
1181 struct DiskPartition64
* dp
= NULL
, * fdp
;
1182 static afs_int32 next_part_sched
= 0;
1183 struct SalvageQueueNode
*node
= NULL
;
1188 * wait for work to be scheduled
1189 * if there are no disk partitions, just sit in this wait loop forever
1191 while (!salvageQueue
.total_len
|| !DiskPartitionList
) {
1192 VOL_CV_WAIT(&salvageQueue
.cv
);
1196 * short circuit for simple case where only one partition has
1197 * scheduled salvages
1199 if (salvageQueue
.last_insert
>= 0 && salvageQueue
.last_insert
<= VOLMAXPARTS
&&
1200 (salvageQueue
.total_len
== salvageQueue
.len
[salvageQueue
.last_insert
])) {
1201 node
= queue_First(&salvageQueue
.part
[salvageQueue
.last_insert
], SalvageQueueNode
);
1207 * ok, more than one partition has scheduled salvages.
1208 * now search for partitions with scheduled salvages, but no pending salvages.
1210 dp
= VGetPartitionById_r(next_part_sched
, 0);
1212 dp
= DiskPartitionList
;
1218 dp
= (dp
->next
) ? dp
->next
: DiskPartitionList
, i
++ ) {
1219 if (!partition_salvaging
[dp
->index
] && salvageQueue
.len
[dp
->index
]) {
1220 node
= queue_First(&salvageQueue
.part
[dp
->index
], SalvageQueueNode
);
1227 * all partitions with scheduled salvages have at least one pending.
1228 * now do an exhaustive search for a scheduled salvage.
1234 dp
= (dp
->next
) ? dp
->next
: DiskPartitionList
, i
++ ) {
1235 if (salvageQueue
.len
[dp
->index
]) {
1236 node
= queue_First(&salvageQueue
.part
[dp
->index
], SalvageQueueNode
);
1241 /* we should never reach this line */
1242 osi_Panic("Node not found\n");
1245 osi_Assert(node
!= NULL
);
1247 partition_salvaging
[node
->partition_id
]++;
1248 DeleteFromSalvageQueue(node
);
1249 AddToPendingQueue(node
);
1252 /* update next_part_sched field */
1254 next_part_sched
= dp
->next
->index
;
1255 } else if (DiskPartitionList
) {
1256 next_part_sched
= DiskPartitionList
->index
;
1258 next_part_sched
= -1;
1267 * update internal scheduler state to reflect completion of a work unit.
1269 * @param[in] node salvage queue node object pointer
1270 * @param[in] result worker process result code
1272 * @post scheduler state is updated.
1277 SALVSYNC_doneWork_r(struct SalvageQueueNode
* node
, int result
)
1282 DeleteFromPendingQueue(node
);
1283 partid
= node
->partition_id
;
1284 if (partid
>=0 && partid
<= VOLMAXPARTS
) {
1285 partition_salvaging
[partid
]--;
1288 node
->state
= SALVSYNC_STATE_DONE
;
1289 } else if (result
!= SALSRV_EXIT_VOLGROUP_LINK
) {
1290 node
->state
= SALVSYNC_STATE_ERROR
;
1293 if (node
->type
== SALVSYNC_VOLGROUP_PARENT
) {
1294 for (idx
= 0; idx
< VOLMAXTYPES
; idx
++) {
1295 if (node
->volgroup
.children
[idx
]) {
1296 node
->volgroup
.children
[idx
]->state
= node
->state
;
1303 * check whether worker child failed.
1305 * @param[in] status status bitfield return by wait()
1307 * @return boolean failure code
1308 * @retval 0 child succeeded
1309 * @retval 1 child failed
1314 ChildFailed(int status
)
1316 return (WCOREDUMP(status
) ||
1317 WIFSIGNALED(status
) ||
1318 ((WEXITSTATUS(status
) != 0) &&
1319 (WEXITSTATUS(status
) != SALSRV_EXIT_VOLGROUP_LINK
)));
1324 * notify salvsync scheduler of node completion, by child pid.
1326 * @param[in] pid pid of worker child
1327 * @param[in] status worker status bitfield from wait()
1329 * @post scheduler state is updated.
1330 * if status code is a failure, fileserver notification was attempted
1332 * @see SALVSYNC_doneWork_r
1335 SALVSYNC_doneWorkByPid(int pid
, int status
)
1337 struct SalvageQueueNode
* node
;
1339 afs_uint32 volids
[VOLMAXTYPES
+1];
1342 memset(volids
, 0, sizeof(volids
));
1345 node
= LookupPendingCommandByPid(pid
);
1347 SALVSYNC_doneWork_r(node
, status
);
1349 if (ChildFailed(status
)) {
1350 /* populate volume id list for later processing outside the glock */
1351 volids
[0] = node
->command
.sop
.volume
;
1352 strcpy(partName
, node
->command
.sop
.partName
);
1353 if (node
->type
== SALVSYNC_VOLGROUP_PARENT
) {
1354 for (idx
= 0; idx
< VOLMAXTYPES
; idx
++) {
1355 if (node
->volgroup
.children
[idx
]) {
1356 volids
[idx
+1] = node
->volgroup
.children
[idx
]->command
.sop
.volume
;
1365 * if necessary, notify fileserver of
1366 * failure to salvage volume group
1367 * [we cannot guarantee that the child made the
1368 * appropriate notifications (e.g. SIGSEGV)]
1369 * -- tkeiser 11/28/2007
1371 if (ChildFailed(status
)) {
1372 for (idx
= 0; idx
<= VOLMAXTYPES
; idx
++) {
1374 FSYNC_VolOp(volids
[idx
],
1376 FSYNC_VOL_FORCE_ERROR
,
1384 #endif /* AFS_DEMAND_ATTACH_FS */