1 {-# LANGUAGE DeriveDataTypeable #-}
3 module UnitTests
.Distribution
.Client
.JobControl
(tests
) where
5 import Distribution
.Client
.JobControl
7 import Distribution
.Client
.Compat
.Prelude
10 import Control
.Concurrent
(threadDelay
)
11 import Control
.Exception
(try)
12 import Control
.Monad
(replicateM
, replicateM_
)
13 import Data
.IORef
(atomicModifyIORef
, newIORef
)
14 import qualified Data
.Set
as Set
17 import Test
.Tasty
.QuickCheck
hiding (collect
)
23 [ testProperty
"submit batch" prop_submit_serial
24 , testProperty
"submit batch" prop_remaining_serial
25 , testProperty
"submit interleaved" prop_interleaved_serial
26 , testProperty
"concurrent jobs" prop_concurrent_serial
27 , testProperty
"cancel" prop_cancel_serial
28 , testProperty
"exceptions" prop_exception_serial
32 [ testProperty
"submit batch" prop_submit_parallel
33 , testProperty
"submit batch" prop_remaining_parallel
34 , testProperty
"submit interleaved" prop_interleaved_parallel
35 , testProperty
"concurrent jobs" prop_concurrent_parallel
36 , testProperty
"cancel" prop_cancel_parallel
37 , testProperty
"exceptions" prop_exception_parallel
41 prop_submit_serial
:: [Int] -> Property
42 prop_submit_serial xs
=
44 jobCtl
<- newSerialJobControl
47 prop_submit_parallel
:: Positive
(Small
Int) -> [Int] -> Property
48 prop_submit_parallel
(Positive
(Small maxJobLimit
)) xs
=
50 jobCtl
<- newParallelJobControl maxJobLimit
53 prop_remaining_serial
:: [Int] -> Property
54 prop_remaining_serial xs
=
56 jobCtl
<- newSerialJobControl
57 prop_remaining jobCtl xs
59 prop_remaining_parallel
:: Positive
(Small
Int) -> [Int] -> Property
60 prop_remaining_parallel
(Positive
(Small maxJobLimit
)) xs
=
62 jobCtl
<- newParallelJobControl maxJobLimit
63 prop_remaining jobCtl xs
65 prop_interleaved_serial
:: [Int] -> Property
66 prop_interleaved_serial xs
=
68 jobCtl
<- newSerialJobControl
69 prop_submit_interleaved jobCtl xs
71 prop_interleaved_parallel
:: Positive
(Small
Int) -> [Int] -> Property
72 prop_interleaved_parallel
(Positive
(Small maxJobLimit
)) xs
=
74 jobCtl
<- newParallelJobControl maxJobLimit
75 prop_submit_interleaved jobCtl xs
77 prop_submit
:: JobControl
IO Int -> [Int] -> IO Bool
78 prop_submit jobCtl xs
= do
79 traverse_
(\x
-> spawnJob jobCtl
(return x
)) xs
80 xs
' <- traverse
(\_
-> collectJob jobCtl
) xs
81 return (sort xs
== sort xs
')
83 prop_remaining
:: JobControl
IO Int -> [Int] -> IO Bool
84 prop_remaining jobCtl xs
= do
85 traverse_
(\x
-> spawnJob jobCtl
(return x
)) xs
86 xs
' <- collectRemainingJobs jobCtl
87 return (sort xs
== sort xs
')
89 collectRemainingJobs
:: Monad m
=> JobControl m a
-> m
[a
]
90 collectRemainingJobs jobCtl
= go
[]
93 remaining
<- remainingJobs jobCtl
96 x
<- collectJob jobCtl
100 prop_submit_interleaved
:: JobControl
IO (Maybe Int) -> [Int] -> IO Bool
101 prop_submit_interleaved jobCtl xs
= do
106 map (\x
-> spawnJob jobCtl
(return (Just x
))) xs
107 ++ repeat (return ())
109 replicate 5 (return Nothing
)
110 ++ map (\_
-> collectJob jobCtl
) xs
111 , (spawn
, collect
) <- zip spawns collects
113 return (sort xs
== sort (catMaybes xs
'))
115 prop_concurrent_serial
:: NonNegative
(Small
Int) -> Property
116 prop_concurrent_serial
(NonNegative
(Small ntasks
)) =
118 jobCtl
<- newSerialJobControl
119 countRef
<- newIORef
(0 :: Int)
120 replicateM_ ntasks
(spawnJob jobCtl
(task countRef
))
121 counts
<- replicateM ntasks
(collectJob jobCtl
)
123 length counts
== ntasks
124 && all (\(n0
, n1
) -> n0
== 0 && n1
== 1) counts
127 n0
<- atomicModifyIORef countRef
(\n -> (n
+ 1, n
))
129 n1
<- atomicModifyIORef countRef
(\n -> (n
- 1, n
))
132 prop_concurrent_parallel
:: Positive
(Small
Int) -> NonNegative
Int -> Property
133 prop_concurrent_parallel
(Positive
(Small maxJobLimit
)) (NonNegative ntasks
) =
135 jobCtl
<- newParallelJobControl maxJobLimit
136 countRef
<- newIORef
(0 :: Int)
137 replicateM_ ntasks
(spawnJob jobCtl
(task countRef
))
138 counts
<- replicateM ntasks
(collectJob jobCtl
)
140 length counts
== ntasks
149 -- we do hit the concurrency limit (in the right circumstances)
150 && if ntasks
>= maxJobLimit
* 2 -- give us enough of a margin
151 then any (\(_
, n1
) -> n1
== maxJobLimit
) counts
155 n0
<- atomicModifyIORef countRef
(\n -> (n
+ 1, n
))
157 n1
<- atomicModifyIORef countRef
(\n -> (n
- 1, n
))
160 prop_cancel_serial
:: [Int] -> [Int] -> Property
161 prop_cancel_serial xs ys
=
163 jobCtl
<- newSerialJobControl
164 traverse_
(\x
-> spawnJob jobCtl
(return x
)) (xs
++ ys
)
165 xs
' <- traverse
(\_
-> collectJob jobCtl
) xs
167 ys
' <- collectRemainingJobs jobCtl
168 return (sort xs
== sort xs
' && null ys
')
170 prop_cancel_parallel
:: Positive
(Small
Int) -> [Int] -> [Int] -> Property
171 prop_cancel_parallel
(Positive
(Small maxJobLimit
)) xs ys
= do
173 jobCtl
<- newParallelJobControl maxJobLimit
174 traverse_
(\x
-> spawnJob jobCtl
(threadDelay
100 >> return x
)) (xs
++ ys
)
175 xs
' <- traverse
(\_
-> collectJob jobCtl
) xs
177 ys
' <- collectRemainingJobs jobCtl
178 return $ Set
.fromList
(xs
' ++ ys
') `Set
.isSubsetOf` Set
.fromList
(xs
++ ys
)
180 data TestException
= TestException
Int
181 deriving (Typeable
, Show)
183 instance Exception TestException
185 prop_exception_serial
:: [Either Int Int] -> Property
186 prop_exception_serial xs
=
188 jobCtl
<- newSerialJobControl
189 prop_exception jobCtl xs
191 prop_exception_parallel
:: Positive
(Small
Int) -> [Either Int Int] -> Property
192 prop_exception_parallel
(Positive
(Small maxJobLimit
)) xs
=
194 jobCtl
<- newParallelJobControl maxJobLimit
195 prop_exception jobCtl xs
197 prop_exception
:: JobControl
IO Int -> [Either Int Int] -> IO Bool
198 prop_exception jobCtl xs
= do
199 traverse_
(\x
-> spawnJob jobCtl
(either (throwIO
. TestException
) return x
)) xs
200 xs
' <- replicateM
(length xs
) $ do
201 mx
<- try (collectJob jobCtl
)
203 Left
(TestException n
) -> Left n
205 return (sort xs
== sort xs
')