Update version number and release date.
[python/dscho.git] / Lib / test / test_tarfile.py
blob584b7b313af0a8b1a62d2638dd2e68c96e2aafa9
1 import sys
2 import os
3 import shutil
5 import unittest
6 import tarfile
8 from test import test_support
10 # Check for our compression modules.
11 try:
12 import gzip
13 gzip.GzipFile
14 except (ImportError, AttributeError):
15 gzip = None
16 try:
17 import bz2
18 except ImportError:
19 bz2 = None
21 def path(path):
22 return test_support.findfile(path)
24 testtar = path("testtar.tar")
25 tempdir = path("testtar.dir")
26 tempname = path("testtar.tmp")
27 membercount = 10
29 def tarname(comp=""):
30 if not comp:
31 return testtar
32 return "%s.%s" % (testtar, comp)
34 def dirname():
35 if not os.path.exists(tempdir):
36 os.mkdir(tempdir)
37 return tempdir
39 def tmpname():
40 return tempname
43 class BaseTest(unittest.TestCase):
44 comp = ''
45 mode = 'r'
46 sep = ':'
48 def setUp(self):
49 mode = self.mode + self.sep + self.comp
50 self.tar = tarfile.open(tarname(self.comp), mode)
52 def tearDown(self):
53 self.tar.close()
55 class ReadTest(BaseTest):
57 def test(self):
58 """Test member extraction.
59 """
60 members = 0
61 for tarinfo in self.tar:
62 members += 1
63 if not tarinfo.isreg():
64 continue
65 f = self.tar.extractfile(tarinfo)
66 self.assert_(len(f.read()) == tarinfo.size,
67 "size read does not match expected size")
68 f.close()
70 self.assert_(members == membercount,
71 "could not find all members")
73 def test_sparse(self):
74 """Test sparse member extraction.
75 """
76 if self.sep != "|":
77 f1 = self.tar.extractfile("S-SPARSE")
78 f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
79 self.assert_(f1.read() == f2.read(),
80 "_FileObject failed on sparse file member")
82 def test_readlines(self):
83 """Test readlines() method of _FileObject.
84 """
85 if self.sep != "|":
86 filename = "0-REGTYPE-TEXT"
87 self.tar.extract(filename, dirname())
88 lines1 = file(os.path.join(dirname(), filename), "rU").readlines()
89 lines2 = self.tar.extractfile(filename).readlines()
90 self.assert_(lines1 == lines2,
91 "_FileObject.readline() does not work correctly")
93 def test_seek(self):
94 """Test seek() method of _FileObject, incl. random reading.
95 """
96 if self.sep != "|":
97 filename = "0-REGTYPE"
98 self.tar.extract(filename, dirname())
99 data = file(os.path.join(dirname(), filename), "rb").read()
101 tarinfo = self.tar.getmember(filename)
102 fobj = self.tar.extractfile(tarinfo)
104 text = fobj.read()
105 fobj.seek(0)
106 self.assert_(0 == fobj.tell(),
107 "seek() to file's start failed")
108 fobj.seek(2048, 0)
109 self.assert_(2048 == fobj.tell(),
110 "seek() to absolute position failed")
111 fobj.seek(-1024, 1)
112 self.assert_(1024 == fobj.tell(),
113 "seek() to negative relative position failed")
114 fobj.seek(1024, 1)
115 self.assert_(2048 == fobj.tell(),
116 "seek() to positive relative position failed")
117 s = fobj.read(10)
118 self.assert_(s == data[2048:2058],
119 "read() after seek failed")
120 fobj.seek(0, 2)
121 self.assert_(tarinfo.size == fobj.tell(),
122 "seek() to file's end failed")
123 self.assert_(fobj.read() == "",
124 "read() at file's end did not return empty string")
125 fobj.seek(-tarinfo.size, 2)
126 self.assert_(0 == fobj.tell(),
127 "relative seek() to file's start failed")
128 fobj.seek(512)
129 s1 = fobj.readlines()
130 fobj.seek(512)
131 s2 = fobj.readlines()
132 self.assert_(s1 == s2,
133 "readlines() after seek failed")
134 fobj.close()
136 class ReadStreamTest(ReadTest):
137 sep = "|"
139 def test(self):
140 """Test member extraction, and for StreamError when
141 seeking backwards.
143 ReadTest.test(self)
144 tarinfo = self.tar.getmembers()[0]
145 f = self.tar.extractfile(tarinfo)
146 self.assertRaises(tarfile.StreamError, f.read)
148 def test_stream(self):
149 """Compare the normal tar and the stream tar.
151 stream = self.tar
152 tar = tarfile.open(tarname(), 'r')
154 while 1:
155 t1 = tar.next()
156 t2 = stream.next()
157 if t1 is None:
158 break
159 self.assert_(t2 is not None, "stream.next() failed.")
161 if t2.islnk() or t2.issym():
162 self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
163 continue
164 v1 = tar.extractfile(t1)
165 v2 = stream.extractfile(t2)
166 if v1 is None:
167 continue
168 self.assert_(v2 is not None, "stream.extractfile() failed")
169 self.assert_(v1.read() == v2.read(), "stream extraction failed")
171 stream.close()
173 class WriteTest(BaseTest):
174 mode = 'w'
176 def setUp(self):
177 mode = self.mode + self.sep + self.comp
178 self.src = tarfile.open(tarname(self.comp), 'r')
179 self.dst = tarfile.open(tmpname(), mode)
181 def tearDown(self):
182 self.src.close()
183 self.dst.close()
185 def test_posix(self):
186 self.dst.posix = 1
187 self._test()
189 def test_nonposix(self):
190 self.dst.posix = 0
191 self._test()
193 def _test(self):
194 for tarinfo in self.src:
195 if not tarinfo.isreg():
196 continue
197 f = self.src.extractfile(tarinfo)
198 if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME:
199 self.assertRaises(ValueError, self.dst.addfile,
200 tarinfo, f)
201 else:
202 self.dst.addfile(tarinfo, f)
204 class WriteStreamTest(WriteTest):
205 sep = '|'
207 # Gzip TestCases
208 class ReadTestGzip(ReadTest):
209 comp = "gz"
210 class ReadStreamTestGzip(ReadStreamTest):
211 comp = "gz"
212 class WriteTestGzip(WriteTest):
213 comp = "gz"
214 class WriteStreamTestGzip(WriteStreamTest):
215 comp = "gz"
217 if bz2:
218 # Bzip2 TestCases
219 class ReadTestBzip2(ReadTestGzip):
220 comp = "bz2"
221 class ReadStreamTestBzip2(ReadStreamTestGzip):
222 comp = "bz2"
223 class WriteTestBzip2(WriteTest):
224 comp = "bz2"
225 class WriteStreamTestBzip2(WriteStreamTestGzip):
226 comp = "bz2"
228 # If importing gzip failed, discard the Gzip TestCases.
229 if not gzip:
230 del ReadTestGzip
231 del ReadStreamTestGzip
232 del WriteTestGzip
233 del WriteStreamTestGzip
235 def test_main():
236 if gzip:
237 # create testtar.tar.gz
238 gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read())
239 if bz2:
240 # create testtar.tar.bz2
241 bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read())
243 try:
244 suite = unittest.TestSuite()
246 suite.addTest(unittest.makeSuite(ReadTest))
247 suite.addTest(unittest.makeSuite(ReadStreamTest))
248 suite.addTest(unittest.makeSuite(WriteTest))
249 suite.addTest(unittest.makeSuite(WriteStreamTest))
251 if gzip:
252 suite.addTest(unittest.makeSuite(ReadTestGzip))
253 suite.addTest(unittest.makeSuite(ReadStreamTestGzip))
254 suite.addTest(unittest.makeSuite(WriteTestGzip))
255 suite.addTest(unittest.makeSuite(WriteStreamTestGzip))
257 if bz2:
258 suite.addTest(unittest.makeSuite(ReadTestBzip2))
259 suite.addTest(unittest.makeSuite(ReadStreamTestBzip2))
260 suite.addTest(unittest.makeSuite(WriteTestBzip2))
261 suite.addTest(unittest.makeSuite(WriteStreamTestBzip2))
263 test_support.run_suite(suite)
264 finally:
265 if gzip:
266 os.remove(tarname("gz"))
267 if bz2:
268 os.remove(tarname("bz2"))
269 if os.path.exists(tempdir):
270 shutil.rmtree(tempdir)
271 if os.path.exists(tempname):
272 os.remove(tempname)
274 if __name__ == "__main__":
275 test_main()