Adjust tests in case running as root
[scons.git] / SCons / CacheDirTests.py
blobe90478839d2752d60980bdd6282058e79a79850e
1 # MIT License
3 # Copyright The SCons Foundation
5 # Permission is hereby granted, free of charge, to any person obtaining
6 # a copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish,
9 # distribute, sublicense, and/or sell copies of the Software, and to
10 # permit persons to whom the Software is furnished to do so, subject to
11 # the following conditions:
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
17 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 import os.path
25 import shutil
26 import sys
27 import unittest
28 import tempfile
29 import stat
31 from TestCmd import TestCmd
33 import SCons.CacheDir
35 built_it = None
37 IS_WINDOWS = sys.platform == 'win32'
38 try:
39 IS_ROOT = os.geteuid() == 0
40 except AttributeError:
41 IS_ROOT = False
44 class Action:
45 def __call__(self, targets, sources, env, **kw) -> int:
46 global built_it
47 if kw.get('execute', 1):
48 built_it = 1
49 return 0
50 def genstring(self, target, source, env):
51 return str(self)
52 def get_contents(self, target, source, env):
53 return bytearray('','utf-8')
55 class Builder:
56 def __init__(self, environment, action) -> None:
57 self.env = environment
58 self.action = action
59 self.overrides = {}
60 self.source_scanner = None
61 self.target_scanner = None
63 class Environment:
64 def __init__(self, cachedir) -> None:
65 self.cachedir = cachedir
66 def Override(self, overrides):
67 return self
68 def get_CacheDir(self):
69 return self.cachedir
71 class BaseTestCase(unittest.TestCase):
72 """
73 Base fixtures common to our other unittest classes.
74 """
75 def setUp(self) -> None:
76 self.test = TestCmd(workdir='')
78 import SCons.Node.FS
79 self.fs = SCons.Node.FS.FS()
81 self._CacheDir = SCons.CacheDir.CacheDir('cache')
83 def File(self, name, bsig=None, action=Action()):
84 node = self.fs.File(name)
85 node.builder_set(Builder(Environment(self._CacheDir), action))
86 if bsig:
87 node.cachesig = bsig
88 #node.binfo = node.BuildInfo(node)
89 #node.binfo.ninfo.bsig = bsig
90 return node
92 def tearDown(self) -> None:
93 os.remove(os.path.join(self._CacheDir.path, 'config'))
94 os.rmdir(self._CacheDir.path)
96 class CacheDirTestCase(BaseTestCase):
97 """
98 Test calling CacheDir code directly.
99 """
100 def test_cachepath(self) -> None:
101 """Test the cachepath() method"""
103 # Verify how the cachepath() method determines the name
104 # of the file in cache.
105 def my_collect(list, hash_format=None):
106 return list[0]
107 save_collect = SCons.Util.hash_collect
108 SCons.Util.hash_collect = my_collect
110 try:
111 name = 'a_fake_bsig'
112 f5 = self.File("cd.f5", name)
113 result = self._CacheDir.cachepath(f5)
114 len = self._CacheDir.config['prefix_len']
115 dirname = os.path.join('cache', name.upper()[:len])
116 filename = os.path.join(dirname, name)
117 assert result == (dirname, filename), result
118 finally:
119 SCons.Util.hash_collect = save_collect
121 class ExceptionTestCase(unittest.TestCase):
122 """Test that the correct exceptions are thrown by CacheDir."""
124 # Don't inherit from BaseTestCase, we're by definition trying to
125 # break things so we really want a clean slate for each test.
126 def setUp(self) -> None:
127 self.tmpdir = tempfile.mkdtemp()
128 self._CacheDir = SCons.CacheDir.CacheDir(self.tmpdir)
130 def tearDown(self) -> None:
131 shutil.rmtree(self.tmpdir)
133 @unittest.skipIf(
134 IS_WINDOWS,
135 "Skip privileged CacheDir test on Windows, cannot change directory rights",
137 @unittest.skipIf(
138 IS_ROOT,
139 "Skip privileged CacheDir test if running as root.",
141 def test_throws_correct_on_OSError(self) -> None:
142 """Test for correct error when cache directory cannot be created."""
143 test = TestCmd()
144 privileged_dir = os.path.join(self.tmpdir, "privileged")
145 cachedir_path = os.path.join(privileged_dir, "cache")
146 os.makedirs(privileged_dir, exist_ok=True)
147 test.writable(privileged_dir, False)
148 with self.assertRaises(SCons.Errors.SConsEnvironmentError) as cm:
149 cd = SCons.CacheDir.CacheDir(cachedir_path)
150 self.assertEqual(
151 str(cm.exception),
152 "Failed to create cache directory " + cachedir_path
154 test.writable(privileged_dir, True)
155 shutil.rmtree(privileged_dir)
158 def test_throws_correct_when_failed_to_write_configfile(self) -> None:
159 class Unserializable:
160 """A class which the JSON should not be able to serialize"""
162 def __init__(self, oldconfig) -> None:
163 self.something = 1 # Make the object unserializable
164 # Pretend to be the old config just enough
165 self.__dict__["prefix_len"] = oldconfig["prefix_len"]
167 def __getitem__(self, name, default=None):
168 if name == "prefix_len":
169 return self.__dict__["prefix_len"]
170 else:
171 return None
173 def __setitem__(self, name, value) -> None:
174 self.__dict__[name] = value
176 oldconfig = self._CacheDir.config
177 self._CacheDir.config = Unserializable(oldconfig)
178 # Remove the config file that got created on object creation
179 # so that _readconfig* will try to rewrite it
180 old_config = os.path.join(self._CacheDir.path, "config")
181 os.remove(old_config)
183 try:
184 self._CacheDir._readconfig(self._CacheDir.path)
185 assert False, "Should have raised exception and did not"
186 except SCons.Errors.SConsEnvironmentError as e:
187 assert str(e) == "Failed to write cache configuration for {}".format(self._CacheDir.path)
189 def test_raise_environment_error_on_invalid_json(self) -> None:
190 config_file = os.path.join(self._CacheDir.path, "config")
191 with open(config_file, "r") as cfg:
192 content = cfg.read()
193 # This will make JSON load raise a ValueError
194 content += "{}"
195 with open(config_file, "w") as cfg:
196 cfg.write(content)
198 try:
199 # Construct a new cache dir that will try to read the invalid config
200 new_cache_dir = SCons.CacheDir.CacheDir(self._CacheDir.path)
201 assert False, "Should have raised exception and did not"
202 except SCons.Errors.SConsEnvironmentError as e:
203 assert str(e) == "Failed to read cache configuration for {}".format(self._CacheDir.path)
205 class FileTestCase(BaseTestCase):
207 Test calling CacheDir code through Node.FS.File interfaces.
209 # These tests were originally in Nodes/FSTests.py and got moved
210 # when the CacheDir support was refactored into its own module.
211 # Look in the history for Node/FSTests.py if any of this needs
212 # to be re-examined.
213 def retrieve_succeed(self, target, source, env, execute: int=1) -> int:
214 self.retrieved.append(target)
215 return 0
217 def retrieve_fail(self, target, source, env, execute: int=1) -> int:
218 self.retrieved.append(target)
219 return 1
221 def push(self, target, source, env) -> int:
222 self.pushed.append(target)
223 return 0
225 def test_CacheRetrieve(self) -> None:
226 """Test the CacheRetrieve() function"""
228 save_CacheRetrieve = SCons.CacheDir.CacheRetrieve
229 self.retrieved = []
231 f1 = self.File("cd.f1")
232 try:
233 SCons.CacheDir.CacheRetrieve = self.retrieve_succeed
234 self.retrieved = []
235 built_it = None
237 r = f1.retrieve_from_cache()
238 assert r == 1, r
239 assert self.retrieved == [f1], self.retrieved
240 assert built_it is None, built_it
242 SCons.CacheDir.CacheRetrieve = self.retrieve_fail
243 self.retrieved = []
244 built_it = None
246 r = f1.retrieve_from_cache()
247 assert not r, r
248 assert self.retrieved == [f1], self.retrieved
249 assert built_it is None, built_it
250 finally:
251 SCons.CacheDir.CacheRetrieve = save_CacheRetrieve
253 def test_CacheRetrieveSilent(self) -> None:
254 """Test the CacheRetrieveSilent() function"""
256 save_CacheRetrieveSilent = SCons.CacheDir.CacheRetrieveSilent
258 SCons.CacheDir.cache_show = 1
260 f2 = self.File("cd.f2", 'f2_bsig')
261 try:
262 SCons.CacheDir.CacheRetrieveSilent = self.retrieve_succeed
263 self.retrieved = []
264 built_it = None
266 r = f2.retrieve_from_cache()
267 assert r == 1, r
268 assert self.retrieved == [f2], self.retrieved
269 assert built_it is None, built_it
271 SCons.CacheDir.CacheRetrieveSilent = self.retrieve_fail
272 self.retrieved = []
273 built_it = None
275 r = f2.retrieve_from_cache()
276 assert r is False, r
277 assert self.retrieved == [f2], self.retrieved
278 assert built_it is None, built_it
279 finally:
280 SCons.CacheDir.CacheRetrieveSilent = save_CacheRetrieveSilent
282 def test_CachePush(self) -> None:
283 """Test the CachePush() function"""
285 save_CachePush = SCons.CacheDir.CachePush
287 SCons.CacheDir.CachePush = self.push
289 try:
290 self.pushed = []
292 cd_f3 = self.test.workpath("cd.f3")
293 f3 = self.File(cd_f3)
294 f3.push_to_cache()
295 assert self.pushed == [], self.pushed
296 self.test.write(cd_f3, "cd.f3\n")
297 f3.push_to_cache()
298 assert self.pushed == [f3], self.pushed
300 self.pushed = []
302 cd_f4 = self.test.workpath("cd.f4")
303 f4 = self.File(cd_f4)
304 f4.visited()
305 assert self.pushed == [], self.pushed
306 self.test.write(cd_f4, "cd.f4\n")
307 f4.clear()
308 f4.visited()
309 assert self.pushed == [], self.pushed
310 SCons.CacheDir.cache_force = 1
311 f4.clear()
312 f4.visited()
313 assert self.pushed == [f4], self.pushed
314 finally:
315 SCons.CacheDir.CachePush = save_CachePush
317 def test_warning(self) -> None:
318 """Test raising a warning if we can't copy a file to cache."""
320 test = TestCmd(workdir='')
322 save_copy2 = shutil.copy2
323 def copy2(src, dst):
324 raise OSError
325 shutil.copy2 = copy2
326 save_mkdir = os.mkdir
327 def mkdir(dir, mode: int=0) -> None:
328 pass
329 os.mkdir = mkdir
330 old_warn_exceptions = SCons.Warnings.warningAsException(1)
331 SCons.Warnings.enableWarningClass(SCons.Warnings.CacheWriteErrorWarning)
333 cd_f7 = self.test.workpath("cd.f7")
334 self.test.write(cd_f7, "cd.f7\n")
335 f7 = self.File(cd_f7, 'f7_bsig')
337 warn_caught = 0
338 r = f7.push_to_cache()
339 assert r.exc_info[0] == SCons.Warnings.CacheWriteErrorWarning
340 shutil.copy2 = save_copy2
341 os.mkdir = save_mkdir
342 SCons.Warnings.warningAsException(old_warn_exceptions)
343 SCons.Warnings.suppressWarningClass(SCons.Warnings.CacheWriteErrorWarning)
345 def test_no_strfunction(self) -> None:
346 """Test handling no strfunction() for an action."""
348 save_CacheRetrieveSilent = SCons.CacheDir.CacheRetrieveSilent
350 f8 = self.File("cd.f8", 'f8_bsig')
351 try:
352 SCons.CacheDir.CacheRetrieveSilent = self.retrieve_succeed
353 self.retrieved = []
354 built_it = None
356 r = f8.retrieve_from_cache()
357 assert r == 1, r
358 assert self.retrieved == [f8], self.retrieved
359 assert built_it is None, built_it
361 SCons.CacheDir.CacheRetrieveSilent = self.retrieve_fail
362 self.retrieved = []
363 built_it = None
365 r = f8.retrieve_from_cache()
366 assert r is False, r
367 assert self.retrieved == [f8], self.retrieved
368 assert built_it is None, built_it
369 finally:
370 SCons.CacheDir.CacheRetrieveSilent = save_CacheRetrieveSilent
372 if __name__ == "__main__":
373 unittest.main()
374 # Local Variables:
375 # tab-width:4
376 # indent-tabs-mode:nil
377 # End:
378 # vim: set expandtab tabstop=4 shiftwidth=4: