Create changelogs for 3.14.1.0 (#10591)
[cabal.git] / cabal-install / src / Distribution / Client / JobControl.hs
blob9cc7ac92a05535d7d6ba5b045a8670e30e184cbf
1 {-# LANGUAGE FlexibleContexts #-}
3 -----------------------------------------------------------------------------
5 -----------------------------------------------------------------------------
7 -- |
8 -- Module : Distribution.Client.JobControl
9 -- Copyright : (c) Duncan Coutts 2012
10 -- License : BSD-like
12 -- Maintainer : cabal-devel@haskell.org
13 -- Stability : provisional
14 -- Portability : portable
16 -- A job control concurrency abstraction
17 module Distribution.Client.JobControl
18 ( JobControl
19 , newSerialJobControl
20 , newParallelJobControl
21 , newSemaphoreJobControl
22 , spawnJob
23 , collectJob
24 , remainingJobs
25 , cancelJobs
26 , cleanupJobControl
27 , jobControlSemaphore
28 , JobLimit
29 , newJobLimit
30 , withJobLimit
31 , Lock
32 , newLock
33 , criticalSection
35 -- * Higher level utils
36 , newJobControlFromParStrat
37 , withJobControl
38 , mapConcurrentWithJobs
39 ) where
41 import Distribution.Client.Compat.Prelude
42 import Prelude ()
44 import Control.Concurrent (forkIO, forkIOWithUnmask, threadDelay)
45 import Control.Concurrent.MVar
46 import Control.Concurrent.STM (STM, TVar, atomically, modifyTVar', newTVarIO, readTVar)
47 import Control.Concurrent.STM.TChan
48 import Control.Exception (bracket, bracket_, mask_, try)
49 import Control.Monad (forever, replicateM_)
50 import Distribution.Client.Compat.Semaphore
51 import Distribution.Client.Utils (numberOfProcessors)
52 import Distribution.Compat.Stack
53 import Distribution.Simple.Compiler
54 import Distribution.Simple.Utils
55 import Distribution.Types.ParStrat
56 import System.Semaphore
58 -- | A simple concurrency abstraction. Jobs can be spawned and can complete
59 -- in any order. This allows both serial and parallel implementations.
60 data JobControl m a = JobControl
61 { spawnJob :: m a -> m ()
62 -- ^ Add a new job to the pool of jobs
63 , collectJob :: m a
64 -- ^ Wait until one job is complete
65 , remainingJobs :: m Bool
66 -- ^ Returns True if there are any outstanding jobs
67 -- (ie spawned but yet to be collected)
68 , cancelJobs :: m ()
69 -- ^ Try to cancel any outstanding but not-yet-started jobs.
70 -- Call 'remainingJobs' after this to find out if any jobs are left
71 -- (ie could not be cancelled).
72 , cleanupJobControl :: m ()
73 -- ^ cleanup any resources created by the JobControl, intended to be used
74 -- as the finaliser for `bracket`.
75 , jobControlSemaphore :: Maybe SemaphoreName
76 -- ^ Name of the semaphore which can be used to control parallelism, if one
77 -- is available for that job control type.
80 -- | Make a 'JobControl' that executes all jobs serially and in order.
81 -- It only executes jobs on demand when they are collected, not eagerly.
83 -- Cancelling will cancel /all/ jobs that have not been collected yet.
84 newSerialJobControl :: IO (JobControl IO a)
85 newSerialJobControl = do
86 qVar <- newTChanIO
87 return
88 JobControl
89 { spawnJob = spawn qVar
90 , collectJob = collect qVar
91 , remainingJobs = remaining qVar
92 , cancelJobs = cancel qVar
93 , cleanupJobControl = return ()
94 , jobControlSemaphore = Nothing
96 where
97 spawn :: TChan (IO a) -> IO a -> IO ()
98 spawn qVar job = atomically $ writeTChan qVar job
100 collect :: TChan (IO a) -> IO a
101 collect qVar =
102 join $ atomically $ readTChan qVar
104 remaining :: TChan (IO a) -> IO Bool
105 remaining qVar = fmap not $ atomically $ isEmptyTChan qVar
107 cancel :: TChan (IO a) -> IO ()
108 cancel qVar = do
109 _ <- atomically $ readAllTChan qVar
110 return ()
112 -- | Make a 'JobControl' that eagerly executes jobs in parallel, with a given
113 -- maximum degree of parallelism.
115 -- Cancelling will cancel jobs that have not yet begun executing, but jobs
116 -- that have already been executed or are currently executing cannot be
117 -- cancelled.
118 newParallelJobControl :: WithCallStack (Int -> IO (JobControl IO a))
119 newParallelJobControl n
120 | n < 1 || n > 1000 =
121 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
122 newParallelJobControl maxJobLimit = do
123 inqVar <- newTChanIO
124 outqVar <- newTChanIO
125 countVar <- newTVarIO 0
126 replicateM_ maxJobLimit $
127 forkIO $
128 worker inqVar outqVar
129 return
130 JobControl
131 { spawnJob = spawn inqVar countVar
132 , collectJob = collect outqVar countVar
133 , remainingJobs = remaining countVar
134 , cancelJobs = cancel inqVar countVar
135 , cleanupJobControl = return ()
136 , jobControlSemaphore = Nothing
138 where
139 worker :: TChan (IO a) -> TChan (Either SomeException a) -> IO ()
140 worker inqVar outqVar =
141 forever $ do
142 job <- atomically $ readTChan inqVar
143 res <- try job
144 atomically $ writeTChan outqVar res
146 spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
147 spawn inqVar countVar job =
148 atomically $ do
149 modifyTVar' countVar (+ 1)
150 writeTChan inqVar job
152 collect :: TChan (Either SomeException a) -> TVar Int -> IO a
153 collect outqVar countVar = do
154 res <- atomically $ do
155 modifyTVar' countVar (subtract 1)
156 readTChan outqVar
157 either throwIO return res
159 remaining :: TVar Int -> IO Bool
160 remaining countVar = fmap (/= 0) $ atomically $ readTVar countVar
162 cancel :: TChan (IO a) -> TVar Int -> IO ()
163 cancel inqVar countVar =
164 atomically $ do
165 xs <- readAllTChan inqVar
166 modifyTVar' countVar (subtract (length xs))
168 readAllTChan :: TChan a -> STM [a]
169 readAllTChan qvar = go []
170 where
171 go xs = do
172 mx <- tryReadTChan qvar
173 case mx of
174 Nothing -> return (reverse xs)
175 Just x -> go (x : xs)
177 -- | Make a 'JobControl' where the parallism is controlled by a semaphore.
179 -- This uses the GHC -jsem option to allow GHC to take additional semaphore slots
180 -- if we are not using them all.
181 newSemaphoreJobControl :: WithCallStack (Verbosity -> Int -> IO (JobControl IO a))
182 newSemaphoreJobControl _ n
183 | n < 1 || n > 1000 =
184 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
185 newSemaphoreJobControl verbosity maxJobLimit = do
186 sem <- freshSemaphore "cabal_semaphore" maxJobLimit
187 notice verbosity $
188 "Created semaphore called "
189 ++ getSemaphoreName (semaphoreName sem)
190 ++ " with "
191 ++ show maxJobLimit
192 ++ " slots."
193 outqVar <- newTChanIO
194 inqVar <- newTChanIO
195 countVar <- newTVarIO 0
196 void (forkIO (worker sem inqVar outqVar))
197 return
198 JobControl
199 { spawnJob = spawn inqVar countVar
200 , collectJob = collect outqVar countVar
201 , remainingJobs = remaining countVar
202 , cancelJobs = cancel inqVar countVar
203 , cleanupJobControl = destroySemaphore sem
204 , jobControlSemaphore = Just (semaphoreName sem)
206 where
207 worker :: Semaphore -> TChan (IO a) -> TChan (Either SomeException a) -> IO ()
208 worker sem inqVar outqVar =
209 forever $ do
210 job <- atomically $ readTChan inqVar
211 -- mask here, as we need to ensure that the thread which contains the
212 -- release action is spawned. Otherwise, there is the chance that an
213 -- async exception is thrown between the semaphore being taken and the
214 -- thread being spawned.
215 mask_ $ do
216 waitOnSemaphore sem
217 void $ forkIOWithUnmask $ \unmask -> do
218 res <- try (unmask job)
219 releaseSemaphore sem 1
220 atomically $ writeTChan outqVar res
221 -- Try to give GHC enough time to compute the module graph and then
222 -- request some additional capabilities if it can make use of them. The
223 -- ideal situation is that we have 1 GHC process running which has taken
224 -- all the capabilities in the semaphore, as this will reduce memory usage.
226 -- 0.25s is chosen by discussion between MP and SD on Mar 17th 2023 as a number
227 -- which isn't too big and not too small but also, not scientifically.
228 threadDelay 250000
230 spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
231 spawn inqVar countVar job =
232 atomically $ do
233 modifyTVar' countVar (+ 1)
234 writeTChan inqVar job
236 collect :: TChan (Either SomeException a) -> TVar Int -> IO a
237 collect outqVar countVar = do
238 res <- atomically $ do
239 modifyTVar' countVar (subtract 1)
240 readTChan outqVar
241 either throwIO return res
243 remaining :: TVar Int -> IO Bool
244 remaining countVar = fmap (/= 0) $ atomically $ readTVar countVar
246 cancel :: TChan (IO a) -> TVar Int -> IO ()
247 cancel inqVar countVar =
248 atomically $ do
249 xs <- readAllTChan inqVar
250 modifyTVar' countVar (subtract (length xs))
252 -------------------------
253 -- Job limits and locks
256 data JobLimit = JobLimit QSem
258 newJobLimit :: Int -> IO JobLimit
259 newJobLimit n =
260 fmap JobLimit (newQSem n)
262 withJobLimit :: JobLimit -> IO a -> IO a
263 withJobLimit (JobLimit sem) =
264 bracket_ (waitQSem sem) (signalQSem sem)
266 newtype Lock = Lock (MVar ())
268 newLock :: IO Lock
269 newLock = fmap Lock $ newMVar ()
271 criticalSection :: Lock -> IO a -> IO a
272 criticalSection (Lock lck) act = bracket_ (takeMVar lck) (putMVar lck ()) act
274 --------------------------------------------------------------------------------
275 -- More high level utils
276 --------------------------------------------------------------------------------
278 newJobControlFromParStrat
279 :: Verbosity
280 -> Compiler
281 -> ParStratInstall
282 -- ^ The parallel strategy
283 -> Maybe Int
284 -- ^ A cap on the number of jobs (e.g. to force a maximum of 2 concurrent downloads despite a -j8 parallel strategy)
285 -> IO (JobControl IO a)
286 newJobControlFromParStrat verbosity compiler parStrat numJobsCap = case parStrat of
287 Serial -> newSerialJobControl
288 NumJobs n -> newParallelJobControl (capJobs (fromMaybe numberOfProcessors n))
289 UseSem n ->
290 if jsemSupported compiler
291 then newSemaphoreJobControl verbosity (capJobs n)
292 else do
293 warn verbosity "-jsem is not supported by the selected compiler, falling back to normal parallelism control."
294 newParallelJobControl (capJobs n)
295 where
296 capJobs n = min (fromMaybe maxBound numJobsCap) n
298 withJobControl :: IO (JobControl IO a) -> (JobControl IO a -> IO b) -> IO b
299 withJobControl mkJC = bracket mkJC cleanupJobControl
301 -- | Concurrently execute actions on a list using the given JobControl.
302 -- The maximum number of concurrent jobs is tied to the JobControl instance.
303 -- The resulting list does /not/ preserve the original order!
304 mapConcurrentWithJobs :: JobControl IO b -> (a -> IO b) -> [a] -> IO [b]
305 mapConcurrentWithJobs jobControl f xs = do
306 traverse_ (spawnJob jobControl . f) xs
307 traverse (const $ collectJob jobControl) xs