cabal init -i should sanitize suggested package name (fix #8404) (#8561)
[cabal.git] / cabal-install / src / Distribution / Client / JobControl.hs
blobc9c16647ac1af65d223aa741d78fb6bd8e20774a
1 {-# LANGUAGE FlexibleContexts #-}
2 -----------------------------------------------------------------------------
3 -- |
4 -- Module : Distribution.Client.JobControl
5 -- Copyright : (c) Duncan Coutts 2012
6 -- License : BSD-like
7 --
8 -- Maintainer : cabal-devel@haskell.org
9 -- Stability : provisional
10 -- Portability : portable
12 -- A job control concurrency abstraction
13 -----------------------------------------------------------------------------
14 module Distribution.Client.JobControl (
15 JobControl,
16 newSerialJobControl,
17 newParallelJobControl,
18 spawnJob,
19 collectJob,
20 remainingJobs,
21 cancelJobs,
23 JobLimit,
24 newJobLimit,
25 withJobLimit,
27 Lock,
28 newLock,
29 criticalSection
30 ) where
32 import Distribution.Client.Compat.Prelude
33 import Prelude ()
35 import Control.Monad (forever, replicateM_)
36 import Control.Concurrent (forkIO)
37 import Control.Concurrent.MVar
38 import Control.Concurrent.STM (STM, atomically)
39 import Control.Concurrent.STM.TVar
40 import Control.Concurrent.STM.TChan
41 import Control.Exception (bracket_, try)
42 import Distribution.Compat.Stack
43 import Distribution.Client.Compat.Semaphore
46 -- | A simple concurrency abstraction. Jobs can be spawned and can complete
47 -- in any order. This allows both serial and parallel implementations.
49 data JobControl m a = JobControl {
50 -- | Add a new job to the pool of jobs
51 spawnJob :: m a -> m (),
53 -- | Wait until one job is complete
54 collectJob :: m a,
56 -- | Returns True if there are any outstanding jobs
57 -- (ie spawned but yet to be collected)
58 remainingJobs :: m Bool,
60 -- | Try to cancel any outstanding but not-yet-started jobs.
61 -- Call 'remainingJobs' after this to find out if any jobs are left
62 -- (ie could not be cancelled).
63 cancelJobs :: m ()
67 -- | Make a 'JobControl' that executes all jobs serially and in order.
68 -- It only executes jobs on demand when they are collected, not eagerly.
70 -- Cancelling will cancel /all/ jobs that have not been collected yet.
72 newSerialJobControl :: IO (JobControl IO a)
73 newSerialJobControl = do
74 qVar <- newTChanIO
75 return JobControl {
76 spawnJob = spawn qVar,
77 collectJob = collect qVar,
78 remainingJobs = remaining qVar,
79 cancelJobs = cancel qVar
81 where
82 spawn :: TChan (IO a) -> IO a -> IO ()
83 spawn qVar job = atomically $ writeTChan qVar job
85 collect :: TChan (IO a) -> IO a
86 collect qVar =
87 join $ atomically $ readTChan qVar
89 remaining :: TChan (IO a) -> IO Bool
90 remaining qVar = fmap not $ atomically $ isEmptyTChan qVar
92 cancel :: TChan (IO a) -> IO ()
93 cancel qVar = do
94 _ <- atomically $ readAllTChan qVar
95 return ()
97 -- | Make a 'JobControl' that eagerly executes jobs in parallel, with a given
98 -- maximum degree of parallelism.
100 -- Cancelling will cancel jobs that have not yet begun executing, but jobs
101 -- that have already been executed or are currently executing cannot be
102 -- cancelled.
104 newParallelJobControl :: WithCallStack (Int -> IO (JobControl IO a))
105 newParallelJobControl n | n < 1 || n > 1000 =
106 error $ "newParallelJobControl: not a sensible number of jobs: " ++ show n
107 newParallelJobControl maxJobLimit = do
108 inqVar <- newTChanIO
109 outqVar <- newTChanIO
110 countVar <- newTVarIO 0
111 replicateM_ maxJobLimit $
112 forkIO $
113 worker inqVar outqVar
114 return JobControl {
115 spawnJob = spawn inqVar countVar,
116 collectJob = collect outqVar countVar,
117 remainingJobs = remaining countVar,
118 cancelJobs = cancel inqVar countVar
120 where
121 worker :: TChan (IO a) -> TChan (Either SomeException a) -> IO ()
122 worker inqVar outqVar =
123 forever $ do
124 job <- atomically $ readTChan inqVar
125 res <- try job
126 atomically $ writeTChan outqVar res
128 spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
129 spawn inqVar countVar job =
130 atomically $ do
131 modifyTVar' countVar (+1)
132 writeTChan inqVar job
134 collect :: TChan (Either SomeException a) -> TVar Int -> IO a
135 collect outqVar countVar = do
136 res <- atomically $ do
137 modifyTVar' countVar (subtract 1)
138 readTChan outqVar
139 either throwIO return res
141 remaining :: TVar Int -> IO Bool
142 remaining countVar = fmap (/=0) $ atomically $ readTVar countVar
144 cancel :: TChan (IO a) -> TVar Int -> IO ()
145 cancel inqVar countVar =
146 atomically $ do
147 xs <- readAllTChan inqVar
148 modifyTVar' countVar (subtract (length xs))
150 readAllTChan :: TChan a -> STM [a]
151 readAllTChan qvar = go []
152 where
153 go xs = do
154 mx <- tryReadTChan qvar
155 case mx of
156 Nothing -> return (reverse xs)
157 Just x -> go (x:xs)
159 -------------------------
160 -- Job limits and locks
163 data JobLimit = JobLimit QSem
165 newJobLimit :: Int -> IO JobLimit
166 newJobLimit n =
167 fmap JobLimit (newQSem n)
169 withJobLimit :: JobLimit -> IO a -> IO a
170 withJobLimit (JobLimit sem) =
171 bracket_ (waitQSem sem) (signalQSem sem)
173 newtype Lock = Lock (MVar ())
175 newLock :: IO Lock
176 newLock = fmap Lock $ newMVar ()
178 criticalSection :: Lock -> IO a -> IO a
179 criticalSection (Lock lck) act = bracket_ (takeMVar lck) (putMVar lck ()) act