This commit was manufactured by cvs2svn to create tag 'r234c1'.
[python/dscho.git] / Lib / test / test_zlib.py
blob3ad255f8bff5e8071288e77a776f162b80f204f2
1 import unittest
2 from test import test_support
3 import zlib
4 import random
6 # print test_support.TESTFN
8 def getbuf():
9 # This was in the original. Avoid non-repeatable sources.
10 # Left here (unused) in case something wants to be done with it.
11 import imp
12 try:
13 t = imp.find_module('test_zlib')
14 file = t[0]
15 except ImportError:
16 file = open(__file__)
17 buf = file.read() * 8
18 file.close()
19 return buf
23 class ChecksumTestCase(unittest.TestCase):
24 # checksum test cases
25 def test_crc32start(self):
26 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
28 def test_crc32empty(self):
29 self.assertEqual(zlib.crc32("", 0), 0)
30 self.assertEqual(zlib.crc32("", 1), 1)
31 self.assertEqual(zlib.crc32("", 432), 432)
33 def test_adler32start(self):
34 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
36 def test_adler32empty(self):
37 self.assertEqual(zlib.adler32("", 0), 0)
38 self.assertEqual(zlib.adler32("", 1), 1)
39 self.assertEqual(zlib.adler32("", 432), 432)
41 def assertEqual32(self, seen, expected):
42 # 32-bit values masked -- checksums on 32- vs 64- bit machines
43 # This is important if bit 31 (0x08000000L) is set.
44 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
46 def test_penguins(self):
47 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
48 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
49 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
50 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
52 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
53 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
57 class ExceptionTestCase(unittest.TestCase):
58 # make sure we generate some expected errors
59 def test_bigbits(self):
60 # specifying total bits too large causes an error
61 self.assertRaises(zlib.error,
62 zlib.compress, 'ERROR', zlib.MAX_WBITS + 1)
64 def test_badcompressobj(self):
65 # verify failure on building compress object with bad params
66 self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0)
68 def test_baddecompressobj(self):
69 # verify failure on building decompress object with bad params
70 self.assertRaises(ValueError, zlib.decompressobj, 0)
74 class CompressTestCase(unittest.TestCase):
75 # Test compression in one go (whole message compression)
76 def test_speech(self):
77 # decompress(compress(data)) better be data
78 x = zlib.compress(hamlet_scene)
79 self.assertEqual(zlib.decompress(x), hamlet_scene)
81 def test_speech8(self):
82 # decompress(compress(data)) better be data -- more compression chances
83 data = hamlet_scene * 8
84 x = zlib.compress(data)
85 self.assertEqual(zlib.decompress(x), data)
87 def test_speech16(self):
88 # decompress(compress(data)) better be data -- more compression chances
89 data = hamlet_scene * 16
90 x = zlib.compress(data)
91 self.assertEqual(zlib.decompress(x), data)
93 def test_speech128(self):
94 # decompress(compress(data)) better be data -- more compression chances
95 data = hamlet_scene * 8 * 16
96 x = zlib.compress(data)
97 self.assertEqual(zlib.decompress(x), data)
102 class CompressObjectTestCase(unittest.TestCase):
103 # Test compression object
104 def test_pairsmall(self):
105 # use compress object in straightforward manner, decompress w/ object
106 data = hamlet_scene
107 co = zlib.compressobj(8, 8, -15)
108 x1 = co.compress(data)
109 x2 = co.flush()
110 self.assertRaises(zlib.error, co.flush) # second flush should not work
111 dco = zlib.decompressobj(-15)
112 y1 = dco.decompress(x1 + x2)
113 y2 = dco.flush()
114 self.assertEqual(data, y1 + y2)
116 def test_pair(self):
117 # straightforward compress/decompress objects, more compression
118 data = hamlet_scene * 8 * 16
119 co = zlib.compressobj(8, 8, -15)
120 x1 = co.compress(data)
121 x2 = co.flush()
122 self.assertRaises(zlib.error, co.flush) # second flush should not work
123 dco = zlib.decompressobj(-15)
124 y1 = dco.decompress(x1 + x2)
125 y2 = dco.flush()
126 self.assertEqual(data, y1 + y2)
128 def test_compressincremental(self):
129 # compress object in steps, decompress object as one-shot
130 data = hamlet_scene * 8 * 16
131 co = zlib.compressobj(2, 8, -12, 9, 1)
132 bufs = []
133 for i in range(0, len(data), 256):
134 bufs.append(co.compress(data[i:i+256]))
135 bufs.append(co.flush())
136 combuf = ''.join(bufs)
138 dco = zlib.decompressobj(-15)
139 y1 = dco.decompress(''.join(bufs))
140 y2 = dco.flush()
141 self.assertEqual(data, y1 + y2)
143 def test_decompressincremental(self):
144 # compress object in steps, decompress object in steps
145 data = hamlet_scene * 8 * 16
146 co = zlib.compressobj(2, 8, -12, 9, 1)
147 bufs = []
148 for i in range(0, len(data), 256):
149 bufs.append(co.compress(data[i:i+256]))
150 bufs.append(co.flush())
151 combuf = ''.join(bufs)
153 self.assertEqual(data, zlib.decompress(combuf, -12, -5))
155 dco = zlib.decompressobj(-12)
156 bufs = []
157 for i in range(0, len(combuf), 128):
158 bufs.append(dco.decompress(combuf[i:i+128]))
159 self.assertEqual('', dco.unconsumed_tail, ########
160 "(A) uct should be '': not %d long" %
161 len(dco.unconsumed_tail))
162 bufs.append(dco.flush())
163 self.assertEqual('', dco.unconsumed_tail, ########
164 "(B) uct should be '': not %d long" %
165 len(dco.unconsumed_tail))
166 self.assertEqual(data, ''.join(bufs))
167 # Failure means: "decompressobj with init options failed"
169 def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
170 # compress object in steps, decompress object in steps, loop sizes
171 source = source or hamlet_scene
172 for reps in sizes:
173 data = source * reps
174 co = zlib.compressobj(2, 8, -12, 9, 1)
175 bufs = []
176 for i in range(0, len(data), cx):
177 bufs.append(co.compress(data[i:i+cx]))
178 bufs.append(co.flush())
179 combuf = ''.join(bufs)
181 self.assertEqual(data, zlib.decompress(combuf, -12, -5))
183 dco = zlib.decompressobj(-12)
184 bufs = []
185 for i in range(0, len(combuf), dcx):
186 bufs.append(dco.decompress(combuf[i:i+dcx]))
187 self.assertEqual('', dco.unconsumed_tail, ########
188 "(A) uct should be '': not %d long" %
189 len(dco.unconsumed_tail))
190 if flush:
191 bufs.append(dco.flush())
192 else:
193 while True:
194 chunk = dco.decompress('')
195 if chunk:
196 bufs.append(chunk)
197 else:
198 break
199 self.assertEqual('', dco.unconsumed_tail, ########
200 "(B) uct should be '': not %d long" %
201 len(dco.unconsumed_tail))
202 self.assertEqual(data, ''.join(bufs))
203 # Failure means: "decompressobj with init options failed"
205 def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
206 # compress in steps, decompress in length-restricted steps, loop sizes
207 source = source or hamlet_scene
208 for reps in sizes:
209 # Check a decompression object with max_length specified
210 data = source * reps
211 co = zlib.compressobj(2, 8, -12, 9, 1)
212 bufs = []
213 for i in range(0, len(data), cx):
214 bufs.append(co.compress(data[i:i+cx]))
215 bufs.append(co.flush())
216 combuf = ''.join(bufs)
217 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
218 'compressed data failure')
220 dco = zlib.decompressobj(-12)
221 bufs = []
222 cb = combuf
223 while cb:
224 #max_length = 1 + len(cb)//10
225 chunk = dco.decompress(cb, dcx)
226 self.failIf(len(chunk) > dcx,
227 'chunk too big (%d>%d)' % (len(chunk), dcx))
228 bufs.append(chunk)
229 cb = dco.unconsumed_tail
230 if flush:
231 bufs.append(dco.flush())
232 else:
233 while True:
234 chunk = dco.decompress('', dcx)
235 self.failIf(len(chunk) > dcx,
236 'chunk too big in tail (%d>%d)' % (len(chunk), dcx))
237 if chunk:
238 bufs.append(chunk)
239 else:
240 break
241 self.assertEqual(len(data), len(''.join(bufs)))
242 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
244 def test_decompressmaxlen(self):
245 # Check a decompression object with max_length specified
246 data = hamlet_scene * 8 * 16
247 co = zlib.compressobj(2, 8, -12, 9, 1)
248 bufs = []
249 for i in range(0, len(data), 256):
250 bufs.append(co.compress(data[i:i+256]))
251 bufs.append(co.flush())
252 combuf = ''.join(bufs)
253 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
254 'compressed data failure')
256 dco = zlib.decompressobj(-12)
257 bufs = []
258 cb = combuf
259 while cb:
260 max_length = 1 + len(cb)//10
261 chunk = dco.decompress(cb, max_length)
262 self.failIf(len(chunk) > max_length,
263 'chunk too big (%d>%d)' % (len(chunk),max_length))
264 bufs.append(chunk)
265 cb = dco.unconsumed_tail
266 bufs.append(dco.flush())
267 self.assertEqual(len(data), len(''.join(bufs)))
268 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
270 def test_decompressmaxlenflushless(self):
271 # identical to test_decompressmaxlen except flush is replaced
272 # with an equivalent. This works and other fails on (eg) 2.2.2
273 data = hamlet_scene * 8 * 16
274 co = zlib.compressobj(2, 8, -12, 9, 1)
275 bufs = []
276 for i in range(0, len(data), 256):
277 bufs.append(co.compress(data[i:i+256]))
278 bufs.append(co.flush())
279 combuf = ''.join(bufs)
280 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
281 'compressed data mismatch')
283 dco = zlib.decompressobj(-12)
284 bufs = []
285 cb = combuf
286 while cb:
287 max_length = 1 + len(cb)//10
288 chunk = dco.decompress(cb, max_length)
289 self.failIf(len(chunk) > max_length,
290 'chunk too big (%d>%d)' % (len(chunk),max_length))
291 bufs.append(chunk)
292 cb = dco.unconsumed_tail
294 #bufs.append(dco.flush())
295 while len(chunk):
296 chunk = dco.decompress('', max_length)
297 self.failIf(len(chunk) > max_length,
298 'chunk too big (%d>%d)' % (len(chunk),max_length))
299 bufs.append(chunk)
301 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
303 def test_maxlenmisc(self):
304 # Misc tests of max_length
305 dco = zlib.decompressobj(-12)
306 self.assertRaises(ValueError, dco.decompress, "", -1)
307 self.assertEqual('', dco.unconsumed_tail)
309 def test_flushes(self):
310 # Test flush() with the various options, using all the
311 # different levels in order to provide more variations.
312 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
313 sync_opt = [getattr(zlib, opt) for opt in sync_opt
314 if hasattr(zlib, opt)]
315 data = hamlet_scene * 8
317 for sync in sync_opt:
318 for level in range(10):
319 obj = zlib.compressobj( level )
320 a = obj.compress( data[:3000] )
321 b = obj.flush( sync )
322 c = obj.compress( data[3000:] )
323 d = obj.flush()
324 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
325 data, ("Decompress failed: flush "
326 "mode=%i, level=%i") % (sync, level))
327 del obj
329 def test_odd_flush(self):
330 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
331 import random
333 if hasattr(zlib, 'Z_SYNC_FLUSH'):
334 # Testing on 17K of "random" data
336 # Create compressor and decompressor objects
337 co = zlib.compressobj(9)
338 dco = zlib.decompressobj()
340 # Try 17K of data
341 # generate random data stream
342 try:
343 # In 2.3 and later, WichmannHill is the RNG of the bug report
344 gen = random.WichmannHill()
345 except AttributeError:
346 try:
347 # 2.2 called it Random
348 gen = random.Random()
349 except AttributeError:
350 # others might simply have a single RNG
351 gen = random
352 gen.seed(1)
353 data = genblock(1, 17 * 1024, generator=gen)
355 # compress, sync-flush, and decompress
356 first = co.compress(data)
357 second = co.flush(zlib.Z_SYNC_FLUSH)
358 expanded = dco.decompress(first + second)
360 # if decompressed data is different from the input data, choke.
361 self.assertEqual(expanded, data, "17K random source doesn't match")
363 def test_manydecompinc(self):
364 # Run incremental decompress test for a large range of sizes
365 self.test_decompinc(sizes=[1<<n for n in range(8)],
366 flush=True, cx=32, dcx=4)
368 def test_manydecompimax(self):
369 # Run incremental decompress maxlen test for a large range of sizes
370 # avoid the flush bug
371 self.test_decompimax(sizes=[1<<n for n in range(8)],
372 flush=False, cx=32, dcx=4)
374 def test_manydecompimaxflush(self):
375 # Run incremental decompress maxlen test for a large range of sizes
376 # avoid the flush bug
377 self.test_decompimax(sizes=[1<<n for n in range(8)],
378 flush=True, cx=32, dcx=4)
381 def genblock(seed, length, step=1024, generator=random):
382 """length-byte stream of random data from a seed (in step-byte blocks)."""
383 if seed is not None:
384 generator.seed(seed)
385 randint = generator.randint
386 if length < step or step < 2:
387 step = length
388 blocks = []
389 for i in range(0, length, step):
390 blocks.append(''.join([chr(randint(0,255))
391 for x in range(step)]))
392 return ''.join(blocks)[:length]
396 def choose_lines(source, number, seed=None, generator=random):
397 """Return a list of number lines randomly chosen from the source"""
398 if seed is not None:
399 generator.seed(seed)
400 sources = source.split('\n')
401 return [generator.choice(sources) for n in range(number)]
405 hamlet_scene = """
406 LAERTES
408 O, fear me not.
409 I stay too long: but here my father comes.
411 Enter POLONIUS
413 A double blessing is a double grace,
414 Occasion smiles upon a second leave.
416 LORD POLONIUS
418 Yet here, Laertes! aboard, aboard, for shame!
419 The wind sits in the shoulder of your sail,
420 And you are stay'd for. There; my blessing with thee!
421 And these few precepts in thy memory
422 See thou character. Give thy thoughts no tongue,
423 Nor any unproportioned thought his act.
424 Be thou familiar, but by no means vulgar.
425 Those friends thou hast, and their adoption tried,
426 Grapple them to thy soul with hoops of steel;
427 But do not dull thy palm with entertainment
428 Of each new-hatch'd, unfledged comrade. Beware
429 Of entrance to a quarrel, but being in,
430 Bear't that the opposed may beware of thee.
431 Give every man thy ear, but few thy voice;
432 Take each man's censure, but reserve thy judgment.
433 Costly thy habit as thy purse can buy,
434 But not express'd in fancy; rich, not gaudy;
435 For the apparel oft proclaims the man,
436 And they in France of the best rank and station
437 Are of a most select and generous chief in that.
438 Neither a borrower nor a lender be;
439 For loan oft loses both itself and friend,
440 And borrowing dulls the edge of husbandry.
441 This above all: to thine ownself be true,
442 And it must follow, as the night the day,
443 Thou canst not then be false to any man.
444 Farewell: my blessing season this in thee!
446 LAERTES
448 Most humbly do I take my leave, my lord.
450 LORD POLONIUS
452 The time invites you; go; your servants tend.
454 LAERTES
456 Farewell, Ophelia; and remember well
457 What I have said to you.
459 OPHELIA
461 'Tis in my memory lock'd,
462 And you yourself shall keep the key of it.
464 LAERTES
466 Farewell.
470 def test_main():
471 test_support.run_unittest(
472 ChecksumTestCase,
473 ExceptionTestCase,
474 CompressTestCase,
475 CompressObjectTestCase
478 if __name__ == "__main__":
479 test_main()
481 def test(tests=''):
482 if not tests: tests = 'o'
483 testcases = []
484 if 'k' in tests: testcases.append(ChecksumTestCase)
485 if 'x' in tests: testcases.append(ExceptionTestCase)
486 if 'c' in tests: testcases.append(CompressTestCase)
487 if 'o' in tests: testcases.append(CompressObjectTestCase)
488 test_support.run_unittest(*testcases)
490 if False:
491 import sys
492 sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test')
493 import test_zlib as tz
494 ts, ut = tz.test_support, tz.unittest
495 su = ut.TestSuite()
496 su.addTest(ut.makeSuite(tz.CompressTestCase))
497 ts.run_suite(su)