1 /* $NetBSD: rf_engine.c,v 1.39 2006/11/16 01:33:23 christos Exp $ */
3 * Copyright (c) 1995 Carnegie-Mellon University.
6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
18 * Carnegie Mellon requests users of this software to return to
20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
29 /****************************************************************************
31 * engine.c -- code for DAG execution engine *
33 * Modified to work as follows (holland): *
34 * A user-thread calls into DispatchDAG, which fires off the nodes that *
35 * are direct successors to the header node. DispatchDAG then returns, *
36 * and the rest of the I/O continues asynchronously. As each node *
37 * completes, the node execution function calls FinishNode(). FinishNode *
38 * scans the list of successors to the node and increments the antecedent *
39 * counts. Each node that becomes enabled is placed on a central node *
40 * queue. A dedicated dag-execution thread grabs nodes off of this *
41 * queue and fires them. *
43 * NULL nodes are never fired. *
45 * Terminator nodes are never fired, but rather cause the callback *
46 * associated with the DAG to be invoked. *
48 * If a node fails, the dag either rolls forward to the completion or *
49 * rolls back, undoing previously-completed nodes and fails atomically. *
50 * The direction of recovery is determined by the location of the failed *
51 * node in the graph. If the failure occurred before the commit node in *
52 * the graph, backward recovery is used. Otherwise, forward recovery is *
55 ****************************************************************************/
57 #include <sys/cdefs.h>
58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.39 2006/11/16 01:33:23 christos Exp $");
60 #include <sys/errno.h>
62 #include "rf_threadstuff.h"
64 #include "rf_engine.h"
65 #include "rf_etimer.h"
66 #include "rf_general.h"
67 #include "rf_dagutils.h"
68 #include "rf_shutdown.h"
71 #include "rf_paritymap.h"
73 static void rf_ShutdownEngine(void *);
74 static void DAGExecutionThread(RF_ThreadArg_t arg
);
75 static void rf_RaidIOThread(RF_ThreadArg_t arg
);
77 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */
79 #define DO_LOCK(_r_) \
82 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
85 #define DO_UNLOCK(_r_) \
87 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
91 #define DO_WAIT(_r_) \
92 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
94 #define DO_SIGNAL(_r_) \
95 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */
98 rf_ShutdownEngine(void *arg
)
103 raidPtr
= (RF_Raid_t
*) arg
;
105 /* Tell the rf_RaidIOThread to shutdown */
106 simple_lock(&(raidPtr
->iodone_lock
));
108 raidPtr
->shutdown_raidio
= 1;
109 wakeup(&(raidPtr
->iodone
));
111 /* ...and wait for it to tell us it has finished */
112 while (raidPtr
->shutdown_raidio
)
113 ltsleep(&(raidPtr
->shutdown_raidio
), PRIBIO
, "raidshutdown", 0,
114 &(raidPtr
->iodone_lock
));
116 simple_unlock(&(raidPtr
->iodone_lock
));
118 /* Now shut down the DAG execution engine. */
120 raidPtr
->shutdown_engine
= 1;
127 rf_ConfigureEngine(RF_ShutdownList_t
**listp
, RF_Raid_t
*raidPtr
,
131 rf_mutex_init(&raidPtr
->node_queue_mutex
);
132 raidPtr
->node_queue
= NULL
;
133 raidPtr
->dags_in_flight
= 0;
135 /* we create the execution thread only once per system boot. no need
136 * to check return code b/c the kernel panics if it can't create the
139 if (rf_engineDebug
) {
140 printf("raid%d: Creating engine thread\n", raidPtr
->raidid
);
143 if (RF_CREATE_ENGINE_THREAD(raidPtr
->engine_thread
,
144 DAGExecutionThread
, raidPtr
,
145 "raid%d", raidPtr
->raidid
)) {
146 printf("raid%d: Unable to create engine thread\n",
150 if (RF_CREATE_ENGINE_THREAD(raidPtr
->engine_helper_thread
,
151 rf_RaidIOThread
, raidPtr
,
152 "raidio%d", raidPtr
->raidid
)) {
153 printf("raid%d: Unable to create raidio thread\n",
158 if (rf_engineDebug
) {
159 printf("raid%d: Created engine thread\n", raidPtr
->raidid
);
163 /* engine thread is now running and waiting for work */
165 if (rf_engineDebug
) {
166 printf("raid%d: Engine thread running and waiting for events\n", raidPtr
->raidid
);
169 rf_ShutdownCreate(listp
, rf_ShutdownEngine
, raidPtr
);
175 BranchDone(RF_DagNode_t
*node
)
179 /* return true if forward execution is completed for a node and it's
181 switch (node
->status
) {
183 /* should never be called in this state */
187 /* node is currently executing, so we're not done */
190 /* for each succedent recursively check branch */
191 for (i
= 0; i
< node
->numSuccedents
; i
++)
192 if (!BranchDone(node
->succedents
[i
]))
194 return RF_TRUE
; /* node and all succedent branches aren't in
197 /* succedents can't fire */
200 /* should never be called in this state */
205 /* XXX need to fix this case */
206 /* for now, assume that we're done */
209 /* illegal node status */
216 NodeReady(RF_DagNode_t
*node
)
220 switch (node
->dagHdr
->status
) {
223 if ((node
->status
== rf_wait
) &&
224 (node
->numAntecedents
== node
->numAntDone
))
229 case rf_rollBackward
:
230 RF_ASSERT(node
->numSuccDone
<= node
->numSuccedents
);
231 RF_ASSERT(node
->numSuccFired
<= node
->numSuccedents
);
232 RF_ASSERT(node
->numSuccFired
<= node
->numSuccDone
);
233 if ((node
->status
== rf_good
) &&
234 (node
->numSuccDone
== node
->numSuccedents
))
240 printf("Execution engine found illegal DAG status in NodeReady\n");
250 /* user context and dag-exec-thread context: Fire a node. The node's
251 * status field determines which function, do or undo, to be fired.
252 * This routine assumes that the node's status field has alread been
253 * set to "fired" or "recover" to indicate the direction of execution.
256 FireNode(RF_DagNode_t
*node
)
258 switch (node
->status
) {
260 /* fire the do function of a node */
262 if (rf_engineDebug
) {
263 printf("raid%d: Firing node 0x%lx (%s)\n",
264 node
->dagHdr
->raidPtr
->raidid
,
265 (unsigned long) node
, node
->name
);
268 if (node
->flags
& RF_DAGNODE_FLAG_YIELD
) {
269 #if defined(__NetBSD__) && defined(_KERNEL)
270 /* thread_block(); */
271 /* printf("Need to block the thread here...\n"); */
272 /* XXX thread_block is actually mentioned in
273 * /usr/include/vm/vm_extern.h */
278 (*(node
->doFunc
)) (node
);
281 /* fire the undo function of a node */
283 if (rf_engineDebug
) {
284 printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
285 node
->dagHdr
->raidPtr
->raidid
,
286 (unsigned long) node
, node
->name
);
289 if (node
->flags
& RF_DAGNODE_FLAG_YIELD
)
290 #if defined(__NetBSD__) && defined(_KERNEL)
291 /* thread_block(); */
292 /* printf("Need to block the thread here...\n"); */
293 /* XXX thread_block is actually mentioned in
294 * /usr/include/vm/vm_extern.h */
298 (*(node
->undoFunc
)) (node
);
309 * Attempt to fire each node in a linear array.
310 * The entire list is fired atomically.
313 FireNodeArray(int numNodes
, RF_DagNode_t
**nodeList
)
315 RF_DagStatus_t dstat
;
319 /* first, mark all nodes which are ready to be fired */
320 for (i
= 0; i
< numNodes
; i
++) {
322 dstat
= node
->dagHdr
->status
;
323 RF_ASSERT((node
->status
== rf_wait
) ||
324 (node
->status
== rf_good
));
325 if (NodeReady(node
)) {
326 if ((dstat
== rf_enable
) ||
327 (dstat
== rf_rollForward
)) {
328 RF_ASSERT(node
->status
== rf_wait
);
329 if (node
->commitNode
)
330 node
->dagHdr
->numCommits
++;
331 node
->status
= rf_fired
;
332 for (j
= 0; j
< node
->numAntecedents
; j
++)
333 node
->antecedents
[j
]->numSuccFired
++;
335 RF_ASSERT(dstat
== rf_rollBackward
);
336 RF_ASSERT(node
->status
== rf_good
);
337 /* only one commit node per graph */
338 RF_ASSERT(node
->commitNode
== RF_FALSE
);
339 node
->status
= rf_recover
;
343 /* now, fire the nodes */
344 for (i
= 0; i
< numNodes
; i
++) {
345 if ((nodeList
[i
]->status
== rf_fired
) ||
346 (nodeList
[i
]->status
== rf_recover
))
347 FireNode(nodeList
[i
]);
353 * Attempt to fire each node in a linked list.
354 * The entire list is fired atomically.
357 FireNodeList(RF_DagNode_t
*nodeList
)
359 RF_DagNode_t
*node
, *next
;
360 RF_DagStatus_t dstat
;
364 /* first, mark all nodes which are ready to be fired */
365 for (node
= nodeList
; node
; node
= next
) {
367 dstat
= node
->dagHdr
->status
;
368 RF_ASSERT((node
->status
== rf_wait
) ||
369 (node
->status
== rf_good
));
370 if (NodeReady(node
)) {
371 if ((dstat
== rf_enable
) ||
372 (dstat
== rf_rollForward
)) {
373 RF_ASSERT(node
->status
== rf_wait
);
374 if (node
->commitNode
)
375 node
->dagHdr
->numCommits
++;
376 node
->status
= rf_fired
;
377 for (j
= 0; j
< node
->numAntecedents
; j
++)
378 node
->antecedents
[j
]->numSuccFired
++;
380 RF_ASSERT(dstat
== rf_rollBackward
);
381 RF_ASSERT(node
->status
== rf_good
);
382 /* only one commit node per graph */
383 RF_ASSERT(node
->commitNode
== RF_FALSE
);
384 node
->status
= rf_recover
;
388 /* now, fire the nodes */
389 for (node
= nodeList
; node
; node
= next
) {
391 if ((node
->status
== rf_fired
) ||
392 (node
->status
== rf_recover
))
397 /* interrupt context:
399 * propagate required results from node to succedent
400 * increment succedent's numAntDone
401 * place newly-enable nodes on node queue for firing
403 * To save context switches, we don't place NIL nodes on the node queue,
404 * but rather just process them as if they had fired. Note that NIL nodes
405 * that are the direct successors of the header will actually get fired by
406 * DispatchDAG, which is fine because no context switches are involved.
408 * Important: when running at user level, this can be called by any
409 * disk thread, and so the increment and check of the antecedent count
410 * must be locked. I used the node queue mutex and locked down the
411 * entire function, but this is certainly overkill.
414 PropagateResults(RF_DagNode_t
*node
, int context
)
419 RF_DagNode_t
*finishlist
= NULL
; /* a list of NIL nodes to be
421 RF_DagNode_t
*skiplist
= NULL
; /* list of nodes with failed truedata
423 RF_DagNode_t
*firelist
= NULL
; /* a list of nodes to be fired */
424 RF_DagNode_t
*q
= NULL
, *qh
= NULL
, *next
;
427 raidPtr
= node
->dagHdr
->raidPtr
;
431 /* debug - validate fire counts */
432 for (i
= 0; i
< node
->numAntecedents
; i
++) {
433 a
= *(node
->antecedents
+ i
);
434 RF_ASSERT(a
->numSuccFired
>= a
->numSuccDone
);
435 RF_ASSERT(a
->numSuccFired
<= a
->numSuccedents
);
439 switch (node
->dagHdr
->status
) {
442 for (i
= 0; i
< node
->numSuccedents
; i
++) {
443 s
= *(node
->succedents
+ i
);
444 RF_ASSERT(s
->status
== rf_wait
);
446 if (s
->numAntDone
== s
->numAntecedents
) {
447 /* look for NIL nodes */
448 if (s
->doFunc
== rf_NullNodeFunc
) {
449 /* don't fire NIL nodes, just process
451 s
->next
= finishlist
;
454 /* look to see if the node is to be
457 for (j
= 0; j
< s
->numAntecedents
; j
++)
458 if ((s
->antType
[j
] == rf_trueData
) && (s
->antecedents
[j
]->status
== rf_bad
))
461 /* this node has one or more
463 * dependencies, so skip it */
467 /* add s to list of nodes (q)
469 if (context
!= RF_INTR_CONTEXT
) {
471 * enqueue if we're at
486 RF_ASSERT(NodeReady(s
));
500 /* xfer our local list of nodes to the node queue */
501 q
->next
= raidPtr
->node_queue
;
502 raidPtr
->node_queue
= qh
;
507 for (; skiplist
; skiplist
= next
) {
508 next
= skiplist
->next
;
509 skiplist
->status
= rf_skipped
;
510 for (i
= 0; i
< skiplist
->numAntecedents
; i
++) {
511 skiplist
->antecedents
[i
]->numSuccFired
++;
513 if (skiplist
->commitNode
) {
514 skiplist
->dagHdr
->numCommits
++;
516 rf_FinishNode(skiplist
, context
);
518 for (; finishlist
; finishlist
= next
) {
519 /* NIL nodes: no need to fire them */
520 next
= finishlist
->next
;
521 finishlist
->status
= rf_good
;
522 for (i
= 0; i
< finishlist
->numAntecedents
; i
++) {
523 finishlist
->antecedents
[i
]->numSuccFired
++;
525 if (finishlist
->commitNode
)
526 finishlist
->dagHdr
->numCommits
++;
528 * Okay, here we're calling rf_FinishNode() on
529 * nodes that have the null function as their
530 * work proc. Such a node could be the
531 * terminal node in a DAG. If so, it will
532 * cause the DAG to complete, which will in
533 * turn free memory used by the DAG, which
534 * includes the node in question. Thus, we
535 * must avoid referencing the node at all
536 * after calling rf_FinishNode() on it. */
537 rf_FinishNode(finishlist
, context
); /* recursive call */
539 /* fire all nodes in firelist */
540 FireNodeList(firelist
);
543 case rf_rollBackward
:
544 for (i
= 0; i
< node
->numAntecedents
; i
++) {
545 a
= *(node
->antecedents
+ i
);
546 RF_ASSERT(a
->status
== rf_good
);
547 RF_ASSERT(a
->numSuccDone
<= a
->numSuccedents
);
548 RF_ASSERT(a
->numSuccDone
<= a
->numSuccFired
);
550 if (a
->numSuccDone
== a
->numSuccFired
) {
551 if (a
->undoFunc
== rf_NullNodeFunc
) {
552 /* don't fire NIL nodes, just process
554 a
->next
= finishlist
;
557 if (context
!= RF_INTR_CONTEXT
) {
558 /* we only have to enqueue if
559 * we're at intr context */
560 /* put node on a list to be
561 fired after we unlock */
566 /* enqueue the node for the
567 dag exec thread to fire */
568 RF_ASSERT(NodeReady(a
));
581 /* xfer our local list of nodes to the node queue */
582 q
->next
= raidPtr
->node_queue
;
583 raidPtr
->node_queue
= qh
;
587 for (; finishlist
; finishlist
= next
) {
588 /* NIL nodes: no need to fire them */
589 next
= finishlist
->next
;
590 finishlist
->status
= rf_good
;
592 * Okay, here we're calling rf_FinishNode() on
593 * nodes that have the null function as their
594 * work proc. Such a node could be the first
595 * node in a DAG. If so, it will cause the DAG
596 * to complete, which will in turn free memory
597 * used by the DAG, which includes the node in
598 * question. Thus, we must avoid referencing
599 * the node at all after calling
600 * rf_FinishNode() on it. */
601 rf_FinishNode(finishlist
, context
); /* recursive call */
603 /* fire all nodes in firelist */
604 FireNodeList(firelist
);
608 printf("Engine found illegal DAG status in PropagateResults()\n");
617 * Process a fired node which has completed
620 ProcessNode(RF_DagNode_t
*node
, int context
)
624 raidPtr
= node
->dagHdr
->raidPtr
;
626 switch (node
->status
) {
628 /* normal case, don't need to do anything */
631 if ((node
->dagHdr
->numCommits
> 0) ||
632 (node
->dagHdr
->numCommitNodes
== 0)) {
633 /* crossed commit barrier */
634 node
->dagHdr
->status
= rf_rollForward
;
636 if (rf_engineDebug
) {
637 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr
->raidid
, node
->name
);
641 /* never reached commit barrier */
642 node
->dagHdr
->status
= rf_rollBackward
;
644 if (rf_engineDebug
) {
645 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr
->raidid
, node
->name
);
651 /* normal rollBackward case, don't need to do anything */
654 /* an undo node failed!!! */
655 printf("UNDO of a node failed!!!/n");
658 printf("node finished execution with an illegal status!!!\n");
663 /* enqueue node's succedents (antecedents if rollBackward) for
665 PropagateResults(node
, context
);
670 /* user context or dag-exec-thread context:
671 * This is the first step in post-processing a newly-completed node.
672 * This routine is called by each node execution function to mark the node
673 * as complete and fire off any successors that have been enabled.
676 rf_FinishNode(RF_DagNode_t
*node
, int context
)
678 int retcode
= RF_FALSE
;
679 node
->dagHdr
->numNodesCompleted
++;
680 ProcessNode(node
, context
);
686 /* user context: submit dag for execution, return non-zero if we have
687 * to wait for completion. if and only if we return non-zero, we'll
688 * cause cbFunc to get invoked with cbArg when the DAG has completed.
690 * for now we always return 1. If the DAG does not cause any I/O,
691 * then the callback may get invoked before DispatchDAG returns.
692 * There's code in state 5 of ContinueRaidAccess to handle this.
694 * All we do here is fire the direct successors of the header node.
695 * The DAG execution thread does the rest of the dag processing. */
697 rf_DispatchDAG(RF_DagHeader_t
*dag
, void (*cbFunc
) (void *),
702 raidPtr
= dag
->raidPtr
;
705 RF_ETIMER_START(dag
->tracerec
->timer
);
709 #if RF_DEBUG_VALIDATE_DAG
710 if (rf_engineDebug
|| rf_validateDAGDebug
) {
711 if (rf_ValidateDAG(dag
))
717 if (rf_engineDebug
) {
718 printf("raid%d: Entering DispatchDAG\n", raidPtr
->raidid
);
721 raidPtr
->dags_in_flight
++; /* debug only: blow off proper
723 dag
->cbFunc
= cbFunc
;
725 dag
->numNodesCompleted
= 0;
726 dag
->status
= rf_enable
;
727 FireNodeArray(dag
->numSuccedents
, dag
->succedents
);
730 /* dedicated kernel thread: the thread that handles all DAG node
731 * firing. To minimize locking and unlocking, we grab a copy of the
732 * entire node queue and then set the node queue to NULL before doing
733 * any firing of nodes. This way we only have to release the lock
734 * once. Of course, it's probably rare that there's more than one
735 * node in the queue at any one time, but it sometimes happens.
739 DAGExecutionThread(RF_ThreadArg_t arg
)
741 RF_DagNode_t
*nd
, *local_nq
, *term_nq
, *fire_nq
;
746 raidPtr
= (RF_Raid_t
*) arg
;
749 if (rf_engineDebug
) {
750 printf("raid%d: Engine thread is running\n", raidPtr
->raidid
);
756 while (!raidPtr
->shutdown_engine
) {
758 while (raidPtr
->node_queue
!= NULL
) {
759 local_nq
= raidPtr
->node_queue
;
762 raidPtr
->node_queue
= NULL
;
765 /* first, strip out the terminal nodes */
768 local_nq
= local_nq
->next
;
769 switch (nd
->dagHdr
->status
) {
772 if (nd
->numSuccedents
== 0) {
773 /* end of the dag, add to
778 /* not the end, add to the
784 case rf_rollBackward
:
785 if (nd
->numAntecedents
== 0) {
786 /* end of the dag, add to the
791 /* not the end, add to the
803 /* execute callback of dags which have reached the
807 term_nq
= term_nq
->next
;
809 (nd
->dagHdr
->cbFunc
) (nd
->dagHdr
->cbArg
);
810 raidPtr
->dags_in_flight
--; /* debug only */
813 /* fire remaining nodes */
814 FireNodeList(fire_nq
);
818 while (!raidPtr
->shutdown_engine
&&
819 raidPtr
->node_queue
== NULL
) {
830 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy()
831 * puts the I/O on a buf_queue, and then signals raidPtr->iodone. If
832 * necessary, this function calls raidstart() to initiate the I/O.
833 * When I/O to a component completes, KernelWakeupFunc() puts the
834 * completed request onto raidPtr->iodone TAILQ. This function looks
835 * after requests on that queue by calling rf_DiskIOComplete() for the
836 * request, and by calling any required CompleteFunc for the request.
840 rf_RaidIOThread(RF_ThreadArg_t arg
)
843 RF_DiskQueueData_t
*req
;
846 raidPtr
= (RF_Raid_t
*) arg
;
849 simple_lock(&(raidPtr
->iodone_lock
));
851 while (!raidPtr
->shutdown_raidio
) {
852 /* if there is nothing to do, then snooze. */
853 if (TAILQ_EMPTY(&(raidPtr
->iodone
)) &&
854 rf_buf_queue_check(raidPtr
->raidid
)) {
855 ltsleep(&(raidPtr
->iodone
), PRIBIO
, "raidiow", 0,
856 &(raidPtr
->iodone_lock
));
859 /* Check for deferred parity-map-related work. */
860 if (raidPtr
->parity_map
!= NULL
) {
861 simple_unlock(&(raidPtr
->iodone_lock
));
862 rf_paritymap_checkwork(raidPtr
->parity_map
);
863 simple_lock(&(raidPtr
->iodone_lock
));
866 /* See what I/Os, if any, have arrived */
867 while ((req
= TAILQ_FIRST(&(raidPtr
->iodone
))) != NULL
) {
868 TAILQ_REMOVE(&(raidPtr
->iodone
), req
, iodone_entries
);
869 simple_unlock(&(raidPtr
->iodone_lock
));
870 rf_DiskIOComplete(req
->queue
, req
, req
->error
);
871 (req
->CompleteFunc
) (req
->argument
, req
->error
);
872 simple_lock(&(raidPtr
->iodone_lock
));
875 /* process any pending outgoing IO */
876 simple_unlock(&(raidPtr
->iodone_lock
));
878 simple_lock(&(raidPtr
->iodone_lock
));
882 /* Let rf_ShutdownEngine know that we're done... */
883 raidPtr
->shutdown_raidio
= 0;
884 wakeup(&(raidPtr
->shutdown_raidio
));
886 simple_unlock(&(raidPtr
->iodone_lock
));