1 {-# LANGUAGE FlexibleContexts #-}
2 -----------------------------------------------------------------------------
4 -- Module : Distribution.Client.JobControl
5 -- Copyright : (c) Duncan Coutts 2012
8 -- Maintainer : cabal-devel@haskell.org
9 -- Stability : provisional
10 -- Portability : portable
12 -- A job control concurrency abstraction
13 -----------------------------------------------------------------------------
14 module Distribution
.Client
.JobControl
(
17 newParallelJobControl
,
32 import Distribution
.Client
.Compat
.Prelude
35 import Control
.Monad
(forever
, replicateM_
)
36 import Control
.Concurrent
(forkIO
)
37 import Control
.Concurrent
.MVar
38 import Control
.Concurrent
.STM
(STM
, atomically
)
39 import Control
.Concurrent
.STM
.TVar
40 import Control
.Concurrent
.STM
.TChan
41 import Control
.Exception
(bracket_, try)
42 import Distribution
.Compat
.Stack
43 import Distribution
.Client
.Compat
.Semaphore
46 -- | A simple concurrency abstraction. Jobs can be spawned and can complete
47 -- in any order. This allows both serial and parallel implementations.
49 data JobControl m a
= JobControl
{
50 -- | Add a new job to the pool of jobs
51 spawnJob
:: m a
-> m
(),
53 -- | Wait until one job is complete
56 -- | Returns True if there are any outstanding jobs
57 -- (ie spawned but yet to be collected)
58 remainingJobs
:: m
Bool,
60 -- | Try to cancel any outstanding but not-yet-started jobs.
61 -- Call 'remainingJobs' after this to find out if any jobs are left
62 -- (ie could not be cancelled).
67 -- | Make a 'JobControl' that executes all jobs serially and in order.
68 -- It only executes jobs on demand when they are collected, not eagerly.
70 -- Cancelling will cancel /all/ jobs that have not been collected yet.
72 newSerialJobControl
:: IO (JobControl
IO a
)
73 newSerialJobControl
= do
76 spawnJob
= spawn qVar
,
77 collectJob
= collect qVar
,
78 remainingJobs
= remaining qVar
,
79 cancelJobs
= cancel qVar
82 spawn
:: TChan
(IO a
) -> IO a
-> IO ()
83 spawn qVar job
= atomically
$ writeTChan qVar job
85 collect
:: TChan
(IO a
) -> IO a
87 join $ atomically
$ readTChan qVar
89 remaining
:: TChan
(IO a
) -> IO Bool
90 remaining qVar
= fmap not $ atomically
$ isEmptyTChan qVar
92 cancel
:: TChan
(IO a
) -> IO ()
94 _
<- atomically
$ readAllTChan qVar
97 -- | Make a 'JobControl' that eagerly executes jobs in parallel, with a given
98 -- maximum degree of parallelism.
100 -- Cancelling will cancel jobs that have not yet begun executing, but jobs
101 -- that have already been executed or are currently executing cannot be
104 newParallelJobControl
:: WithCallStack
(Int -> IO (JobControl
IO a
))
105 newParallelJobControl n | n
< 1 || n
> 1000 =
106 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
107 newParallelJobControl maxJobLimit
= do
109 outqVar
<- newTChanIO
110 countVar
<- newTVarIO
0
111 replicateM_ maxJobLimit
$
113 worker inqVar outqVar
115 spawnJob
= spawn inqVar countVar
,
116 collectJob
= collect outqVar countVar
,
117 remainingJobs
= remaining countVar
,
118 cancelJobs
= cancel inqVar countVar
121 worker
:: TChan
(IO a
) -> TChan
(Either SomeException a
) -> IO ()
122 worker inqVar outqVar
=
124 job
<- atomically
$ readTChan inqVar
126 atomically
$ writeTChan outqVar res
128 spawn
:: TChan
(IO a
) -> TVar
Int -> IO a
-> IO ()
129 spawn inqVar countVar job
=
131 modifyTVar
' countVar
(+1)
132 writeTChan inqVar job
134 collect
:: TChan
(Either SomeException a
) -> TVar
Int -> IO a
135 collect outqVar countVar
= do
136 res
<- atomically
$ do
137 modifyTVar
' countVar
(subtract 1)
139 either throwIO
return res
141 remaining
:: TVar
Int -> IO Bool
142 remaining countVar
= fmap (/=0) $ atomically
$ readTVar countVar
144 cancel
:: TChan
(IO a
) -> TVar
Int -> IO ()
145 cancel inqVar countVar
=
147 xs
<- readAllTChan inqVar
148 modifyTVar
' countVar
(subtract (length xs
))
150 readAllTChan
:: TChan a
-> STM
[a
]
151 readAllTChan qvar
= go
[]
154 mx
<- tryReadTChan qvar
156 Nothing
-> return (reverse xs
)
159 -------------------------
160 -- Job limits and locks
163 data JobLimit
= JobLimit QSem
165 newJobLimit
:: Int -> IO JobLimit
167 fmap JobLimit
(newQSem n
)
169 withJobLimit
:: JobLimit
-> IO a
-> IO a
170 withJobLimit
(JobLimit sem
) =
171 bracket_ (waitQSem sem
) (signalQSem sem
)
173 newtype Lock
= Lock
(MVar
())
176 newLock
= fmap Lock
$ newMVar
()
178 criticalSection
:: Lock
-> IO a
-> IO a
179 criticalSection
(Lock lck
) act
= bracket_ (takeMVar lck
) (putMVar lck
()) act