1 {-# LANGUAGE FlexibleContexts #-}
3 -----------------------------------------------------------------------------
5 -----------------------------------------------------------------------------
8 -- Module : Distribution.Client.JobControl
9 -- Copyright : (c) Duncan Coutts 2012
12 -- Maintainer : cabal-devel@haskell.org
13 -- Stability : provisional
14 -- Portability : portable
16 -- A job control concurrency abstraction
17 module Distribution
.Client
.JobControl
20 , newParallelJobControl
21 , newSemaphoreJobControl
36 import Distribution
.Client
.Compat
.Prelude
39 import Control
.Concurrent
(forkIO
, forkIOWithUnmask
, threadDelay
)
40 import Control
.Concurrent
.MVar
41 import Control
.Concurrent
.STM
(STM
, TVar
, atomically
, modifyTVar
', newTVarIO
, readTVar
)
42 import Control
.Concurrent
.STM
.TChan
43 import Control
.Exception
(bracket_, mask_
, try)
44 import Control
.Monad
(forever
, replicateM_
)
45 import Distribution
.Client
.Compat
.Semaphore
46 import Distribution
.Compat
.Stack
47 import Distribution
.Simple
.Utils
48 import Distribution
.Verbosity
49 import System
.Semaphore
51 -- | A simple concurrency abstraction. Jobs can be spawned and can complete
52 -- in any order. This allows both serial and parallel implementations.
53 data JobControl m a
= JobControl
54 { spawnJob
:: m a
-> m
()
55 -- ^ Add a new job to the pool of jobs
57 -- ^ Wait until one job is complete
58 , remainingJobs
:: m
Bool
59 -- ^ Returns True if there are any outstanding jobs
60 -- (ie spawned but yet to be collected)
62 -- ^ Try to cancel any outstanding but not-yet-started jobs.
63 -- Call 'remainingJobs' after this to find out if any jobs are left
64 -- (ie could not be cancelled).
65 , cleanupJobControl
:: m
()
66 -- ^ cleanup any resources created by the JobControl, intended to be used
67 -- as the finaliser for `bracket`.
68 , jobControlSemaphore
:: Maybe SemaphoreName
69 -- ^ Name of the semaphore which can be used to control parallelism, if one
70 -- is available for that job control type.
73 -- | Make a 'JobControl' that executes all jobs serially and in order.
74 -- It only executes jobs on demand when they are collected, not eagerly.
76 -- Cancelling will cancel /all/ jobs that have not been collected yet.
77 newSerialJobControl
:: IO (JobControl
IO a
)
78 newSerialJobControl
= do
82 { spawnJob
= spawn qVar
83 , collectJob
= collect qVar
84 , remainingJobs
= remaining qVar
85 , cancelJobs
= cancel qVar
86 , cleanupJobControl
= return ()
87 , jobControlSemaphore
= Nothing
90 spawn
:: TChan
(IO a
) -> IO a
-> IO ()
91 spawn qVar job
= atomically
$ writeTChan qVar job
93 collect
:: TChan
(IO a
) -> IO a
95 join $ atomically
$ readTChan qVar
97 remaining
:: TChan
(IO a
) -> IO Bool
98 remaining qVar
= fmap not $ atomically
$ isEmptyTChan qVar
100 cancel
:: TChan
(IO a
) -> IO ()
102 _
<- atomically
$ readAllTChan qVar
105 -- | Make a 'JobControl' that eagerly executes jobs in parallel, with a given
106 -- maximum degree of parallelism.
108 -- Cancelling will cancel jobs that have not yet begun executing, but jobs
109 -- that have already been executed or are currently executing cannot be
111 newParallelJobControl
:: WithCallStack
(Int -> IO (JobControl
IO a
))
112 newParallelJobControl n
113 | n
< 1 || n
> 1000 =
114 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
115 newParallelJobControl maxJobLimit
= do
117 outqVar
<- newTChanIO
118 countVar
<- newTVarIO
0
119 replicateM_ maxJobLimit
$
121 worker inqVar outqVar
124 { spawnJob
= spawn inqVar countVar
125 , collectJob
= collect outqVar countVar
126 , remainingJobs
= remaining countVar
127 , cancelJobs
= cancel inqVar countVar
128 , cleanupJobControl
= return ()
129 , jobControlSemaphore
= Nothing
132 worker
:: TChan
(IO a
) -> TChan
(Either SomeException a
) -> IO ()
133 worker inqVar outqVar
=
135 job
<- atomically
$ readTChan inqVar
137 atomically
$ writeTChan outqVar res
139 spawn
:: TChan
(IO a
) -> TVar
Int -> IO a
-> IO ()
140 spawn inqVar countVar job
=
142 modifyTVar
' countVar
(+ 1)
143 writeTChan inqVar job
145 collect
:: TChan
(Either SomeException a
) -> TVar
Int -> IO a
146 collect outqVar countVar
= do
147 res
<- atomically
$ do
148 modifyTVar
' countVar
(subtract 1)
150 either throwIO
return res
152 remaining
:: TVar
Int -> IO Bool
153 remaining countVar
= fmap (/= 0) $ atomically
$ readTVar countVar
155 cancel
:: TChan
(IO a
) -> TVar
Int -> IO ()
156 cancel inqVar countVar
=
158 xs
<- readAllTChan inqVar
159 modifyTVar
' countVar
(subtract (length xs
))
161 readAllTChan
:: TChan a
-> STM
[a
]
162 readAllTChan qvar
= go
[]
165 mx
<- tryReadTChan qvar
167 Nothing
-> return (reverse xs
)
168 Just x
-> go
(x
: xs
)
170 -- | Make a 'JobControl' where the parallism is controlled by a semaphore.
172 -- This uses the GHC -jsem option to allow GHC to take additional semaphore slots
173 -- if we are not using them all.
174 newSemaphoreJobControl
:: WithCallStack
(Int -> IO (JobControl
IO a
))
175 newSemaphoreJobControl n
176 | n
< 1 || n
> 1000 =
177 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
178 newSemaphoreJobControl maxJobLimit
= do
179 sem
<- freshSemaphore
"cabal_semaphore" maxJobLimit
181 "Created semaphore called "
182 ++ getSemaphoreName
(semaphoreName sem
)
186 outqVar
<- newTChanIO
188 countVar
<- newTVarIO
0
189 void
(forkIO
(worker sem inqVar outqVar
))
192 { spawnJob
= spawn inqVar countVar
193 , collectJob
= collect outqVar countVar
194 , remainingJobs
= remaining countVar
195 , cancelJobs
= cancel inqVar countVar
196 , cleanupJobControl
= destroySemaphore sem
197 , jobControlSemaphore
= Just
(semaphoreName sem
)
200 worker
:: Semaphore
-> TChan
(IO a
) -> TChan
(Either SomeException a
) -> IO ()
201 worker sem inqVar outqVar
=
203 job
<- atomically
$ readTChan inqVar
204 -- mask here, as we need to ensure that the thread which contains the
205 -- release action is spawned. Otherwise, there is the chance that an
206 -- async exception is thrown between the semaphore being taken and the
207 -- thread being spawned.
210 void
$ forkIOWithUnmask
$ \unmask
-> do
211 res
<- try (unmask job
)
212 releaseSemaphore sem
1
213 atomically
$ writeTChan outqVar res
214 -- Try to give GHC enough time to compute the module graph and then
215 -- request some additional capabilities if it can make use of them. The
216 -- ideal situation is that we have 1 GHC process running which has taken
217 -- all the capabilities in the semaphore, as this will reduce memory usage.
219 -- 0.25s is chosen by discussion between MP and SD on Mar 17th 2023 as a number
220 -- which isn't too big and not too small but also, not scientifically.
223 spawn
:: TChan
(IO a
) -> TVar
Int -> IO a
-> IO ()
224 spawn inqVar countVar job
=
226 modifyTVar
' countVar
(+ 1)
227 writeTChan inqVar job
229 collect
:: TChan
(Either SomeException a
) -> TVar
Int -> IO a
230 collect outqVar countVar
= do
231 res
<- atomically
$ do
232 modifyTVar
' countVar
(subtract 1)
234 either throwIO
return res
236 remaining
:: TVar
Int -> IO Bool
237 remaining countVar
= fmap (/= 0) $ atomically
$ readTVar countVar
239 cancel
:: TChan
(IO a
) -> TVar
Int -> IO ()
240 cancel inqVar countVar
=
242 xs
<- readAllTChan inqVar
243 modifyTVar
' countVar
(subtract (length xs
))
245 -------------------------
246 -- Job limits and locks
249 data JobLimit
= JobLimit QSem
251 newJobLimit
:: Int -> IO JobLimit
253 fmap JobLimit
(newQSem n
)
255 withJobLimit
:: JobLimit
-> IO a
-> IO a
256 withJobLimit
(JobLimit sem
) =
257 bracket_ (waitQSem sem
) (signalQSem sem
)
259 newtype Lock
= Lock
(MVar
())
262 newLock
= fmap Lock
$ newMVar
()
264 criticalSection
:: Lock
-> IO a
-> IO a
265 criticalSection
(Lock lck
) act
= bracket_ (takeMVar lck
) (putMVar lck
()) act