Add “Ignore warning” option to cabal check
[cabal.git] / cabal-install / src / Distribution / Client / JobControl.hs
blobec14847851219a4e245fca71a62945bc1fe0ba52
1 {-# LANGUAGE FlexibleContexts #-}
3 -----------------------------------------------------------------------------
5 -----------------------------------------------------------------------------
7 -- |
8 -- Module : Distribution.Client.JobControl
9 -- Copyright : (c) Duncan Coutts 2012
10 -- License : BSD-like
12 -- Maintainer : cabal-devel@haskell.org
13 -- Stability : provisional
14 -- Portability : portable
16 -- A job control concurrency abstraction
17 module Distribution.Client.JobControl
18 ( JobControl
19 , newSerialJobControl
20 , newParallelJobControl
21 , newSemaphoreJobControl
22 , spawnJob
23 , collectJob
24 , remainingJobs
25 , cancelJobs
26 , cleanupJobControl
27 , jobControlSemaphore
28 , JobLimit
29 , newJobLimit
30 , withJobLimit
31 , Lock
32 , newLock
33 , criticalSection
34 ) where
36 import Distribution.Client.Compat.Prelude
37 import Prelude ()
39 import Control.Concurrent (forkIO, forkIOWithUnmask, threadDelay)
40 import Control.Concurrent.MVar
41 import Control.Concurrent.STM (STM, TVar, atomically, modifyTVar', newTVarIO, readTVar)
42 import Control.Concurrent.STM.TChan
43 import Control.Exception (bracket_, mask_, try)
44 import Control.Monad (forever, replicateM_)
45 import Distribution.Client.Compat.Semaphore
46 import Distribution.Compat.Stack
47 import Distribution.Simple.Utils
48 import Distribution.Verbosity
49 import System.Semaphore
51 -- | A simple concurrency abstraction. Jobs can be spawned and can complete
52 -- in any order. This allows both serial and parallel implementations.
53 data JobControl m a = JobControl
54 { spawnJob :: m a -> m ()
55 -- ^ Add a new job to the pool of jobs
56 , collectJob :: m a
57 -- ^ Wait until one job is complete
58 , remainingJobs :: m Bool
59 -- ^ Returns True if there are any outstanding jobs
60 -- (ie spawned but yet to be collected)
61 , cancelJobs :: m ()
62 -- ^ Try to cancel any outstanding but not-yet-started jobs.
63 -- Call 'remainingJobs' after this to find out if any jobs are left
64 -- (ie could not be cancelled).
65 , cleanupJobControl :: m ()
66 -- ^ cleanup any resources created by the JobControl, intended to be used
67 -- as the finaliser for `bracket`.
68 , jobControlSemaphore :: Maybe SemaphoreName
69 -- ^ Name of the semaphore which can be used to control parallelism, if one
70 -- is available for that job control type.
73 -- | Make a 'JobControl' that executes all jobs serially and in order.
74 -- It only executes jobs on demand when they are collected, not eagerly.
76 -- Cancelling will cancel /all/ jobs that have not been collected yet.
77 newSerialJobControl :: IO (JobControl IO a)
78 newSerialJobControl = do
79 qVar <- newTChanIO
80 return
81 JobControl
82 { spawnJob = spawn qVar
83 , collectJob = collect qVar
84 , remainingJobs = remaining qVar
85 , cancelJobs = cancel qVar
86 , cleanupJobControl = return ()
87 , jobControlSemaphore = Nothing
89 where
90 spawn :: TChan (IO a) -> IO a -> IO ()
91 spawn qVar job = atomically $ writeTChan qVar job
93 collect :: TChan (IO a) -> IO a
94 collect qVar =
95 join $ atomically $ readTChan qVar
97 remaining :: TChan (IO a) -> IO Bool
98 remaining qVar = fmap not $ atomically $ isEmptyTChan qVar
100 cancel :: TChan (IO a) -> IO ()
101 cancel qVar = do
102 _ <- atomically $ readAllTChan qVar
103 return ()
105 -- | Make a 'JobControl' that eagerly executes jobs in parallel, with a given
106 -- maximum degree of parallelism.
108 -- Cancelling will cancel jobs that have not yet begun executing, but jobs
109 -- that have already been executed or are currently executing cannot be
110 -- cancelled.
111 newParallelJobControl :: WithCallStack (Int -> IO (JobControl IO a))
112 newParallelJobControl n
113 | n < 1 || n > 1000 =
114 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
115 newParallelJobControl maxJobLimit = do
116 inqVar <- newTChanIO
117 outqVar <- newTChanIO
118 countVar <- newTVarIO 0
119 replicateM_ maxJobLimit $
120 forkIO $
121 worker inqVar outqVar
122 return
123 JobControl
124 { spawnJob = spawn inqVar countVar
125 , collectJob = collect outqVar countVar
126 , remainingJobs = remaining countVar
127 , cancelJobs = cancel inqVar countVar
128 , cleanupJobControl = return ()
129 , jobControlSemaphore = Nothing
131 where
132 worker :: TChan (IO a) -> TChan (Either SomeException a) -> IO ()
133 worker inqVar outqVar =
134 forever $ do
135 job <- atomically $ readTChan inqVar
136 res <- try job
137 atomically $ writeTChan outqVar res
139 spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
140 spawn inqVar countVar job =
141 atomically $ do
142 modifyTVar' countVar (+ 1)
143 writeTChan inqVar job
145 collect :: TChan (Either SomeException a) -> TVar Int -> IO a
146 collect outqVar countVar = do
147 res <- atomically $ do
148 modifyTVar' countVar (subtract 1)
149 readTChan outqVar
150 either throwIO return res
152 remaining :: TVar Int -> IO Bool
153 remaining countVar = fmap (/= 0) $ atomically $ readTVar countVar
155 cancel :: TChan (IO a) -> TVar Int -> IO ()
156 cancel inqVar countVar =
157 atomically $ do
158 xs <- readAllTChan inqVar
159 modifyTVar' countVar (subtract (length xs))
161 readAllTChan :: TChan a -> STM [a]
162 readAllTChan qvar = go []
163 where
164 go xs = do
165 mx <- tryReadTChan qvar
166 case mx of
167 Nothing -> return (reverse xs)
168 Just x -> go (x : xs)
170 -- | Make a 'JobControl' where the parallism is controlled by a semaphore.
172 -- This uses the GHC -jsem option to allow GHC to take additional semaphore slots
173 -- if we are not using them all.
174 newSemaphoreJobControl :: WithCallStack (Int -> IO (JobControl IO a))
175 newSemaphoreJobControl n
176 | n < 1 || n > 1000 =
177 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
178 newSemaphoreJobControl maxJobLimit = do
179 sem <- freshSemaphore "cabal_semaphore" maxJobLimit
180 notice normal $
181 "Created semaphore called "
182 ++ getSemaphoreName (semaphoreName sem)
183 ++ " with "
184 ++ show maxJobLimit
185 ++ " slots."
186 outqVar <- newTChanIO
187 inqVar <- newTChanIO
188 countVar <- newTVarIO 0
189 void (forkIO (worker sem inqVar outqVar))
190 return
191 JobControl
192 { spawnJob = spawn inqVar countVar
193 , collectJob = collect outqVar countVar
194 , remainingJobs = remaining countVar
195 , cancelJobs = cancel inqVar countVar
196 , cleanupJobControl = destroySemaphore sem
197 , jobControlSemaphore = Just (semaphoreName sem)
199 where
200 worker :: Semaphore -> TChan (IO a) -> TChan (Either SomeException a) -> IO ()
201 worker sem inqVar outqVar =
202 forever $ do
203 job <- atomically $ readTChan inqVar
204 -- mask here, as we need to ensure that the thread which contains the
205 -- release action is spawned. Otherwise, there is the chance that an
206 -- async exception is thrown between the semaphore being taken and the
207 -- thread being spawned.
208 mask_ $ do
209 waitOnSemaphore sem
210 void $ forkIOWithUnmask $ \unmask -> do
211 res <- try (unmask job)
212 releaseSemaphore sem 1
213 atomically $ writeTChan outqVar res
214 -- Try to give GHC enough time to compute the module graph and then
215 -- request some additional capabilities if it can make use of them. The
216 -- ideal situation is that we have 1 GHC process running which has taken
217 -- all the capabilities in the semaphore, as this will reduce memory usage.
219 -- 0.25s is chosen by discussion between MP and SD on Mar 17th 2023 as a number
220 -- which isn't too big and not too small but also, not scientifically.
221 threadDelay 250000
223 spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
224 spawn inqVar countVar job =
225 atomically $ do
226 modifyTVar' countVar (+ 1)
227 writeTChan inqVar job
229 collect :: TChan (Either SomeException a) -> TVar Int -> IO a
230 collect outqVar countVar = do
231 res <- atomically $ do
232 modifyTVar' countVar (subtract 1)
233 readTChan outqVar
234 either throwIO return res
236 remaining :: TVar Int -> IO Bool
237 remaining countVar = fmap (/= 0) $ atomically $ readTVar countVar
239 cancel :: TChan (IO a) -> TVar Int -> IO ()
240 cancel inqVar countVar =
241 atomically $ do
242 xs <- readAllTChan inqVar
243 modifyTVar' countVar (subtract (length xs))
245 -------------------------
246 -- Job limits and locks
249 data JobLimit = JobLimit QSem
251 newJobLimit :: Int -> IO JobLimit
252 newJobLimit n =
253 fmap JobLimit (newQSem n)
255 withJobLimit :: JobLimit -> IO a -> IO a
256 withJobLimit (JobLimit sem) =
257 bracket_ (waitQSem sem) (signalQSem sem)
259 newtype Lock = Lock (MVar ())
261 newLock :: IO Lock
262 newLock = fmap Lock $ newMVar ()
264 criticalSection :: Lock -> IO a -> IO a
265 criticalSection (Lock lck) act = bracket_ (takeMVar lck) (putMVar lck ()) act