1 {-# LANGUAGE FlexibleContexts #-}
3 -----------------------------------------------------------------------------
5 -----------------------------------------------------------------------------
8 -- Module : Distribution.Client.JobControl
9 -- Copyright : (c) Duncan Coutts 2012
12 -- Maintainer : cabal-devel@haskell.org
13 -- Stability : provisional
14 -- Portability : portable
16 -- A job control concurrency abstraction
17 module Distribution
.Client
.JobControl
20 , newParallelJobControl
21 , newSemaphoreJobControl
35 -- * Higher level utils
36 , newJobControlFromParStrat
38 , mapConcurrentWithJobs
41 import Distribution
.Client
.Compat
.Prelude
44 import Control
.Concurrent
(forkIO
, forkIOWithUnmask
, threadDelay
)
45 import Control
.Concurrent
.MVar
46 import Control
.Concurrent
.STM
(STM
, TVar
, atomically
, modifyTVar
', newTVarIO
, readTVar
)
47 import Control
.Concurrent
.STM
.TChan
48 import Control
.Exception
(bracket, bracket_, mask_
, try)
49 import Control
.Monad
(forever
, replicateM_
)
50 import Distribution
.Client
.Compat
.Semaphore
51 import Distribution
.Client
.Utils
(numberOfProcessors
)
52 import Distribution
.Compat
.Stack
53 import Distribution
.Simple
.Compiler
54 import Distribution
.Simple
.Utils
55 import Distribution
.Types
.ParStrat
56 import System
.Semaphore
58 -- | A simple concurrency abstraction. Jobs can be spawned and can complete
59 -- in any order. This allows both serial and parallel implementations.
60 data JobControl m a
= JobControl
61 { spawnJob
:: m a
-> m
()
62 -- ^ Add a new job to the pool of jobs
64 -- ^ Wait until one job is complete
65 , remainingJobs
:: m
Bool
66 -- ^ Returns True if there are any outstanding jobs
67 -- (ie spawned but yet to be collected)
69 -- ^ Try to cancel any outstanding but not-yet-started jobs.
70 -- Call 'remainingJobs' after this to find out if any jobs are left
71 -- (ie could not be cancelled).
72 , cleanupJobControl
:: m
()
73 -- ^ cleanup any resources created by the JobControl, intended to be used
74 -- as the finaliser for `bracket`.
75 , jobControlSemaphore
:: Maybe SemaphoreName
76 -- ^ Name of the semaphore which can be used to control parallelism, if one
77 -- is available for that job control type.
80 -- | Make a 'JobControl' that executes all jobs serially and in order.
81 -- It only executes jobs on demand when they are collected, not eagerly.
83 -- Cancelling will cancel /all/ jobs that have not been collected yet.
84 newSerialJobControl
:: IO (JobControl
IO a
)
85 newSerialJobControl
= do
89 { spawnJob
= spawn qVar
90 , collectJob
= collect qVar
91 , remainingJobs
= remaining qVar
92 , cancelJobs
= cancel qVar
93 , cleanupJobControl
= return ()
94 , jobControlSemaphore
= Nothing
97 spawn
:: TChan
(IO a
) -> IO a
-> IO ()
98 spawn qVar job
= atomically
$ writeTChan qVar job
100 collect
:: TChan
(IO a
) -> IO a
102 join $ atomically
$ readTChan qVar
104 remaining
:: TChan
(IO a
) -> IO Bool
105 remaining qVar
= fmap not $ atomically
$ isEmptyTChan qVar
107 cancel
:: TChan
(IO a
) -> IO ()
109 _
<- atomically
$ readAllTChan qVar
112 -- | Make a 'JobControl' that eagerly executes jobs in parallel, with a given
113 -- maximum degree of parallelism.
115 -- Cancelling will cancel jobs that have not yet begun executing, but jobs
116 -- that have already been executed or are currently executing cannot be
118 newParallelJobControl
:: WithCallStack
(Int -> IO (JobControl
IO a
))
119 newParallelJobControl n
120 | n
< 1 || n
> 1000 =
121 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
122 newParallelJobControl maxJobLimit
= do
124 outqVar
<- newTChanIO
125 countVar
<- newTVarIO
0
126 replicateM_ maxJobLimit
$
128 worker inqVar outqVar
131 { spawnJob
= spawn inqVar countVar
132 , collectJob
= collect outqVar countVar
133 , remainingJobs
= remaining countVar
134 , cancelJobs
= cancel inqVar countVar
135 , cleanupJobControl
= return ()
136 , jobControlSemaphore
= Nothing
139 worker
:: TChan
(IO a
) -> TChan
(Either SomeException a
) -> IO ()
140 worker inqVar outqVar
=
142 job
<- atomically
$ readTChan inqVar
144 atomically
$ writeTChan outqVar res
146 spawn
:: TChan
(IO a
) -> TVar
Int -> IO a
-> IO ()
147 spawn inqVar countVar job
=
149 modifyTVar
' countVar
(+ 1)
150 writeTChan inqVar job
152 collect
:: TChan
(Either SomeException a
) -> TVar
Int -> IO a
153 collect outqVar countVar
= do
154 res
<- atomically
$ do
155 modifyTVar
' countVar
(subtract 1)
157 either throwIO
return res
159 remaining
:: TVar
Int -> IO Bool
160 remaining countVar
= fmap (/= 0) $ atomically
$ readTVar countVar
162 cancel
:: TChan
(IO a
) -> TVar
Int -> IO ()
163 cancel inqVar countVar
=
165 xs
<- readAllTChan inqVar
166 modifyTVar
' countVar
(subtract (length xs
))
168 readAllTChan
:: TChan a
-> STM
[a
]
169 readAllTChan qvar
= go
[]
172 mx
<- tryReadTChan qvar
174 Nothing
-> return (reverse xs
)
175 Just x
-> go
(x
: xs
)
177 -- | Make a 'JobControl' where the parallism is controlled by a semaphore.
179 -- This uses the GHC -jsem option to allow GHC to take additional semaphore slots
180 -- if we are not using them all.
181 newSemaphoreJobControl
:: WithCallStack
(Verbosity
-> Int -> IO (JobControl
IO a
))
182 newSemaphoreJobControl _ n
183 | n
< 1 || n
> 1000 =
184 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
185 newSemaphoreJobControl verbosity maxJobLimit
= do
186 sem
<- freshSemaphore
"cabal_semaphore" maxJobLimit
188 "Created semaphore called "
189 ++ getSemaphoreName
(semaphoreName sem
)
193 outqVar
<- newTChanIO
195 countVar
<- newTVarIO
0
196 void
(forkIO
(worker sem inqVar outqVar
))
199 { spawnJob
= spawn inqVar countVar
200 , collectJob
= collect outqVar countVar
201 , remainingJobs
= remaining countVar
202 , cancelJobs
= cancel inqVar countVar
203 , cleanupJobControl
= destroySemaphore sem
204 , jobControlSemaphore
= Just
(semaphoreName sem
)
207 worker
:: Semaphore
-> TChan
(IO a
) -> TChan
(Either SomeException a
) -> IO ()
208 worker sem inqVar outqVar
=
210 job
<- atomically
$ readTChan inqVar
211 -- mask here, as we need to ensure that the thread which contains the
212 -- release action is spawned. Otherwise, there is the chance that an
213 -- async exception is thrown between the semaphore being taken and the
214 -- thread being spawned.
217 void
$ forkIOWithUnmask
$ \unmask
-> do
218 res
<- try (unmask job
)
219 releaseSemaphore sem
1
220 atomically
$ writeTChan outqVar res
221 -- Try to give GHC enough time to compute the module graph and then
222 -- request some additional capabilities if it can make use of them. The
223 -- ideal situation is that we have 1 GHC process running which has taken
224 -- all the capabilities in the semaphore, as this will reduce memory usage.
226 -- 0.25s is chosen by discussion between MP and SD on Mar 17th 2023 as a number
227 -- which isn't too big and not too small but also, not scientifically.
230 spawn
:: TChan
(IO a
) -> TVar
Int -> IO a
-> IO ()
231 spawn inqVar countVar job
=
233 modifyTVar
' countVar
(+ 1)
234 writeTChan inqVar job
236 collect
:: TChan
(Either SomeException a
) -> TVar
Int -> IO a
237 collect outqVar countVar
= do
238 res
<- atomically
$ do
239 modifyTVar
' countVar
(subtract 1)
241 either throwIO
return res
243 remaining
:: TVar
Int -> IO Bool
244 remaining countVar
= fmap (/= 0) $ atomically
$ readTVar countVar
246 cancel
:: TChan
(IO a
) -> TVar
Int -> IO ()
247 cancel inqVar countVar
=
249 xs
<- readAllTChan inqVar
250 modifyTVar
' countVar
(subtract (length xs
))
252 -------------------------
253 -- Job limits and locks
256 data JobLimit
= JobLimit QSem
258 newJobLimit
:: Int -> IO JobLimit
260 fmap JobLimit
(newQSem n
)
262 withJobLimit
:: JobLimit
-> IO a
-> IO a
263 withJobLimit
(JobLimit sem
) =
264 bracket_ (waitQSem sem
) (signalQSem sem
)
266 newtype Lock
= Lock
(MVar
())
269 newLock
= fmap Lock
$ newMVar
()
271 criticalSection
:: Lock
-> IO a
-> IO a
272 criticalSection
(Lock lck
) act
= bracket_ (takeMVar lck
) (putMVar lck
()) act
274 --------------------------------------------------------------------------------
275 -- More high level utils
276 --------------------------------------------------------------------------------
278 newJobControlFromParStrat
282 -- ^ The parallel strategy
284 -- ^ A cap on the number of jobs (e.g. to force a maximum of 2 concurrent downloads despite a -j8 parallel strategy)
285 -> IO (JobControl
IO a
)
286 newJobControlFromParStrat verbosity compiler parStrat numJobsCap
= case parStrat
of
287 Serial
-> newSerialJobControl
288 NumJobs n
-> newParallelJobControl
(capJobs
(fromMaybe numberOfProcessors n
))
290 if jsemSupported compiler
291 then newSemaphoreJobControl verbosity
(capJobs n
)
293 warn verbosity
"-jsem is not supported by the selected compiler, falling back to normal parallelism control."
294 newParallelJobControl
(capJobs n
)
296 capJobs n
= min (fromMaybe maxBound numJobsCap
) n
298 withJobControl
:: IO (JobControl
IO a
) -> (JobControl
IO a
-> IO b
) -> IO b
299 withJobControl mkJC
= bracket mkJC cleanupJobControl
301 -- | Concurrently execute actions on a list using the given JobControl.
302 -- The maximum number of concurrent jobs is tied to the JobControl instance.
303 -- The resulting list does /not/ preserve the original order!
304 mapConcurrentWithJobs
:: JobControl
IO b
-> (a
-> IO b
) -> [a
] -> IO [b
]
305 mapConcurrentWithJobs jobControl f xs
= do
306 traverse_
(spawnJob jobControl
. f
) xs
307 traverse
(const $ collectJob jobControl
) xs