_make_boundary(): Fix for SF bug #745478, broken boundary calculation
[python/dscho.git] / Lib / test / test_zlib.py
blobc703964e3f35fbf85c9bb9d0677194d1e7c19c46
1 import unittest
2 from test import test_support
3 import zlib
4 import random
6 # print test_support.TESTFN
8 def getbuf():
9 # This was in the original. Avoid non-repeatable sources.
10 # Left here (unused) in case something wants to be done with it.
11 import imp
12 try:
13 t = imp.find_module('test_zlib')
14 file = t[0]
15 except ImportError:
16 file = open(__file__)
17 buf = file.read() * 8
18 file.close()
19 return buf
23 class ChecksumTestCase(unittest.TestCase):
24 # checksum test cases
25 def test_crc32start(self):
26 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
28 def test_crc32empty(self):
29 self.assertEqual(zlib.crc32("", 0), 0)
30 self.assertEqual(zlib.crc32("", 1), 1)
31 self.assertEqual(zlib.crc32("", 432), 432)
33 def test_adler32start(self):
34 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
36 def test_adler32empty(self):
37 self.assertEqual(zlib.adler32("", 0), 0)
38 self.assertEqual(zlib.adler32("", 1), 1)
39 self.assertEqual(zlib.adler32("", 432), 432)
41 def assertEqual32(self, seen, expected):
42 # 32-bit values masked -- checksums on 32- vs 64- bit machines
43 # This is important if bit 31 (0x08000000L) is set.
44 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
46 def test_penguins(self):
47 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
48 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
49 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
50 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
52 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
53 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
57 class ExceptionTestCase(unittest.TestCase):
58 # make sure we generate some expected errors
59 def test_bigbits(self):
60 # specifying total bits too large causes an error
61 self.assertRaises(zlib.error,
62 zlib.compress, 'ERROR', zlib.MAX_WBITS + 1)
64 def test_badcompressobj(self):
65 # verify failure on building compress object with bad params
66 self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0)
68 def test_baddecompressobj(self):
69 # verify failure on building decompress object with bad params
70 self.assertRaises(ValueError, zlib.decompressobj, 0)
74 class CompressTestCase(unittest.TestCase):
75 # Test compression in one go (whole message compression)
76 def test_speech(self):
77 # decompress(compress(data)) better be data
78 x = zlib.compress(hamlet_scene)
79 self.assertEqual(zlib.decompress(x), hamlet_scene)
81 def test_speech8(self):
82 # decompress(compress(data)) better be data -- more compression chances
83 data = hamlet_scene * 8
84 x = zlib.compress(data)
85 self.assertEqual(zlib.decompress(x), data)
87 def test_speech16(self):
88 # decompress(compress(data)) better be data -- more compression chances
89 data = hamlet_scene * 16
90 x = zlib.compress(data)
91 self.assertEqual(zlib.decompress(x), data)
93 def test_speech128(self):
94 # decompress(compress(data)) better be data -- more compression chances
95 data = hamlet_scene * 8 * 16
96 x = zlib.compress(data)
97 self.assertEqual(zlib.decompress(x), data)
99 def test_monotonic(self):
100 # higher compression levels should not expand compressed size
101 data = hamlet_scene * 8 * 16
102 last = length = len(zlib.compress(data, 0))
103 self.failUnless(last > len(data), "compress level 0 always expands")
104 for level in range(10):
105 length = len(zlib.compress(data, level))
106 self.failUnless(length <= last,
107 'compress level %d more effective than %d!' % (
108 level-1, level))
109 last = length
113 class CompressObjectTestCase(unittest.TestCase):
114 # Test compression object
115 def test_pairsmall(self):
116 # use compress object in straightforward manner, decompress w/ object
117 data = hamlet_scene
118 co = zlib.compressobj(8, 8, -15)
119 x1 = co.compress(data)
120 x2 = co.flush()
121 self.assertRaises(zlib.error, co.flush) # second flush should not work
122 dco = zlib.decompressobj(-15)
123 y1 = dco.decompress(x1 + x2)
124 y2 = dco.flush()
125 self.assertEqual(data, y1 + y2)
127 def test_pair(self):
128 # straightforward compress/decompress objects, more compression
129 data = hamlet_scene * 8 * 16
130 co = zlib.compressobj(8, 8, -15)
131 x1 = co.compress(data)
132 x2 = co.flush()
133 self.assertRaises(zlib.error, co.flush) # second flush should not work
134 dco = zlib.decompressobj(-15)
135 y1 = dco.decompress(x1 + x2)
136 y2 = dco.flush()
137 self.assertEqual(data, y1 + y2)
139 def test_compressincremental(self):
140 # compress object in steps, decompress object as one-shot
141 data = hamlet_scene * 8 * 16
142 co = zlib.compressobj(2, 8, -12, 9, 1)
143 bufs = []
144 for i in range(0, len(data), 256):
145 bufs.append(co.compress(data[i:i+256]))
146 bufs.append(co.flush())
147 combuf = ''.join(bufs)
149 dco = zlib.decompressobj(-15)
150 y1 = dco.decompress(''.join(bufs))
151 y2 = dco.flush()
152 self.assertEqual(data, y1 + y2)
154 def test_decompressincremental(self):
155 # compress object in steps, decompress object in steps
156 data = hamlet_scene * 8 * 16
157 co = zlib.compressobj(2, 8, -12, 9, 1)
158 bufs = []
159 for i in range(0, len(data), 256):
160 bufs.append(co.compress(data[i:i+256]))
161 bufs.append(co.flush())
162 combuf = ''.join(bufs)
164 self.assertEqual(data, zlib.decompress(combuf, -12, -5))
166 dco = zlib.decompressobj(-12)
167 bufs = []
168 for i in range(0, len(combuf), 128):
169 bufs.append(dco.decompress(combuf[i:i+128]))
170 self.assertEqual('', dco.unconsumed_tail, ########
171 "(A) uct should be '': not %d long" %
172 len(dco.unconsumed_tail))
173 bufs.append(dco.flush())
174 self.assertEqual('', dco.unconsumed_tail, ########
175 "(B) uct should be '': not %d long" %
176 len(dco.unconsumed_tail))
177 self.assertEqual(data, ''.join(bufs))
178 # Failure means: "decompressobj with init options failed"
180 def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
181 # compress object in steps, decompress object in steps, loop sizes
182 source = source or hamlet_scene
183 for reps in sizes:
184 data = source * reps
185 co = zlib.compressobj(2, 8, -12, 9, 1)
186 bufs = []
187 for i in range(0, len(data), cx):
188 bufs.append(co.compress(data[i:i+cx]))
189 bufs.append(co.flush())
190 combuf = ''.join(bufs)
192 self.assertEqual(data, zlib.decompress(combuf, -12, -5))
194 dco = zlib.decompressobj(-12)
195 bufs = []
196 for i in range(0, len(combuf), dcx):
197 bufs.append(dco.decompress(combuf[i:i+dcx]))
198 self.assertEqual('', dco.unconsumed_tail, ########
199 "(A) uct should be '': not %d long" %
200 len(dco.unconsumed_tail))
201 if flush:
202 bufs.append(dco.flush())
203 else:
204 while True:
205 chunk = dco.decompress('')
206 if chunk:
207 bufs.append(chunk)
208 else:
209 break
210 self.assertEqual('', dco.unconsumed_tail, ########
211 "(B) uct should be '': not %d long" %
212 len(dco.unconsumed_tail))
213 self.assertEqual(data, ''.join(bufs))
214 # Failure means: "decompressobj with init options failed"
216 def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
217 # compress in steps, decompress in length-restricted steps, loop sizes
218 source = source or hamlet_scene
219 for reps in sizes:
220 # Check a decompression object with max_length specified
221 data = source * reps
222 co = zlib.compressobj(2, 8, -12, 9, 1)
223 bufs = []
224 for i in range(0, len(data), cx):
225 bufs.append(co.compress(data[i:i+cx]))
226 bufs.append(co.flush())
227 combuf = ''.join(bufs)
228 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
229 'compressed data failure')
231 dco = zlib.decompressobj(-12)
232 bufs = []
233 cb = combuf
234 while cb:
235 #max_length = 1 + len(cb)//10
236 chunk = dco.decompress(cb, dcx)
237 self.failIf(len(chunk) > dcx,
238 'chunk too big (%d>%d)' % (len(chunk), dcx))
239 bufs.append(chunk)
240 cb = dco.unconsumed_tail
241 if flush:
242 bufs.append(dco.flush())
243 else:
244 while True:
245 chunk = dco.decompress('', dcx)
246 self.failIf(len(chunk) > dcx,
247 'chunk too big in tail (%d>%d)' % (len(chunk), dcx))
248 if chunk:
249 bufs.append(chunk)
250 else:
251 break
252 self.assertEqual(len(data), len(''.join(bufs)))
253 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
255 def test_decompressmaxlen(self):
256 # Check a decompression object with max_length specified
257 data = hamlet_scene * 8 * 16
258 co = zlib.compressobj(2, 8, -12, 9, 1)
259 bufs = []
260 for i in range(0, len(data), 256):
261 bufs.append(co.compress(data[i:i+256]))
262 bufs.append(co.flush())
263 combuf = ''.join(bufs)
264 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
265 'compressed data failure')
267 dco = zlib.decompressobj(-12)
268 bufs = []
269 cb = combuf
270 while cb:
271 max_length = 1 + len(cb)//10
272 chunk = dco.decompress(cb, max_length)
273 self.failIf(len(chunk) > max_length,
274 'chunk too big (%d>%d)' % (len(chunk),max_length))
275 bufs.append(chunk)
276 cb = dco.unconsumed_tail
277 bufs.append(dco.flush())
278 self.assertEqual(len(data), len(''.join(bufs)))
279 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
281 def test_decompressmaxlenflushless(self):
282 # identical to test_decompressmaxlen except flush is replaced
283 # with an equivalent. This works and other fails on (eg) 2.2.2
284 data = hamlet_scene * 8 * 16
285 co = zlib.compressobj(2, 8, -12, 9, 1)
286 bufs = []
287 for i in range(0, len(data), 256):
288 bufs.append(co.compress(data[i:i+256]))
289 bufs.append(co.flush())
290 combuf = ''.join(bufs)
291 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
292 'compressed data mismatch')
294 dco = zlib.decompressobj(-12)
295 bufs = []
296 cb = combuf
297 while cb:
298 max_length = 1 + len(cb)//10
299 chunk = dco.decompress(cb, max_length)
300 self.failIf(len(chunk) > max_length,
301 'chunk too big (%d>%d)' % (len(chunk),max_length))
302 bufs.append(chunk)
303 cb = dco.unconsumed_tail
305 #bufs.append(dco.flush())
306 while len(chunk):
307 chunk = dco.decompress('', max_length)
308 self.failIf(len(chunk) > max_length,
309 'chunk too big (%d>%d)' % (len(chunk),max_length))
310 bufs.append(chunk)
312 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
314 def test_maxlenmisc(self):
315 # Misc tests of max_length
316 dco = zlib.decompressobj(-12)
317 self.assertRaises(ValueError, dco.decompress, "", -1)
318 self.assertEqual('', dco.unconsumed_tail)
320 def test_flushes(self):
321 # Test flush() with the various options, using all the
322 # different levels in order to provide more variations.
323 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
324 sync_opt = [getattr(zlib, opt) for opt in sync_opt
325 if hasattr(zlib, opt)]
326 data = hamlet_scene * 8
328 for sync in sync_opt:
329 for level in range(10):
330 obj = zlib.compressobj( level )
331 a = obj.compress( data[:3000] )
332 b = obj.flush( sync )
333 c = obj.compress( data[3000:] )
334 d = obj.flush()
335 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
336 data, ("Decompress failed: flush "
337 "mode=%i, level=%i") % (sync, level))
338 del obj
340 def test_odd_flush(self):
341 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
342 import random
344 if hasattr(zlib, 'Z_SYNC_FLUSH'):
345 # Testing on 17K of "random" data
347 # Create compressor and decompressor objects
348 co = zlib.compressobj(9)
349 dco = zlib.decompressobj()
351 # Try 17K of data
352 # generate random data stream
353 try:
354 # In 2.3 and later, WichmannHill is the RNG of the bug report
355 gen = random.WichmannHill()
356 except AttributeError:
357 try:
358 # 2.2 called it Random
359 gen = random.Random()
360 except AttributeError:
361 # others might simply have a single RNG
362 gen = random
363 gen.seed(1)
364 data = genblock(1, 17 * 1024, generator=gen)
366 # compress, sync-flush, and decompress
367 first = co.compress(data)
368 second = co.flush(zlib.Z_SYNC_FLUSH)
369 expanded = dco.decompress(first + second)
371 # if decompressed data is different from the input data, choke.
372 self.assertEqual(expanded, data, "17K random source doesn't match")
374 def test_manydecompinc(self):
375 # Run incremental decompress test for a large range of sizes
376 self.test_decompinc(sizes=[1<<n for n in range(8)],
377 flush=True, cx=32, dcx=4)
379 def test_manydecompimax(self):
380 # Run incremental decompress maxlen test for a large range of sizes
381 # avoid the flush bug
382 self.test_decompimax(sizes=[1<<n for n in range(8)],
383 flush=False, cx=32, dcx=4)
385 def test_manydecompimaxflush(self):
386 # Run incremental decompress maxlen test for a large range of sizes
387 # avoid the flush bug
388 self.test_decompimax(sizes=[1<<n for n in range(8)],
389 flush=True, cx=32, dcx=4)
392 def genblock(seed, length, step=1024, generator=random):
393 """length-byte stream of random data from a seed (in step-byte blocks)."""
394 if seed is not None:
395 generator.seed(seed)
396 randint = generator.randint
397 if length < step or step < 2:
398 step = length
399 blocks = []
400 for i in range(0, length, step):
401 blocks.append(''.join([chr(randint(0,255))
402 for x in range(step)]))
403 return ''.join(blocks)[:length]
407 def choose_lines(source, number, seed=None, generator=random):
408 """Return a list of number lines randomly chosen from the source"""
409 if seed is not None:
410 generator.seed(seed)
411 sources = source.split('\n')
412 return [generator.choice(sources) for n in range(number)]
416 hamlet_scene = """
417 LAERTES
419 O, fear me not.
420 I stay too long: but here my father comes.
422 Enter POLONIUS
424 A double blessing is a double grace,
425 Occasion smiles upon a second leave.
427 LORD POLONIUS
429 Yet here, Laertes! aboard, aboard, for shame!
430 The wind sits in the shoulder of your sail,
431 And you are stay'd for. There; my blessing with thee!
432 And these few precepts in thy memory
433 See thou character. Give thy thoughts no tongue,
434 Nor any unproportioned thought his act.
435 Be thou familiar, but by no means vulgar.
436 Those friends thou hast, and their adoption tried,
437 Grapple them to thy soul with hoops of steel;
438 But do not dull thy palm with entertainment
439 Of each new-hatch'd, unfledged comrade. Beware
440 Of entrance to a quarrel, but being in,
441 Bear't that the opposed may beware of thee.
442 Give every man thy ear, but few thy voice;
443 Take each man's censure, but reserve thy judgment.
444 Costly thy habit as thy purse can buy,
445 But not express'd in fancy; rich, not gaudy;
446 For the apparel oft proclaims the man,
447 And they in France of the best rank and station
448 Are of a most select and generous chief in that.
449 Neither a borrower nor a lender be;
450 For loan oft loses both itself and friend,
451 And borrowing dulls the edge of husbandry.
452 This above all: to thine ownself be true,
453 And it must follow, as the night the day,
454 Thou canst not then be false to any man.
455 Farewell: my blessing season this in thee!
457 LAERTES
459 Most humbly do I take my leave, my lord.
461 LORD POLONIUS
463 The time invites you; go; your servants tend.
465 LAERTES
467 Farewell, Ophelia; and remember well
468 What I have said to you.
470 OPHELIA
472 'Tis in my memory lock'd,
473 And you yourself shall keep the key of it.
475 LAERTES
477 Farewell.
481 def test_main():
482 test_support.run_unittest(
483 ChecksumTestCase,
484 ExceptionTestCase,
485 CompressTestCase,
486 CompressObjectTestCase
489 if __name__ == "__main__":
490 test_main()
492 def test(tests=''):
493 if not tests: tests = 'o'
494 testcases = []
495 if 'k' in tests: testcases.append(ChecksumTestCase)
496 if 'x' in tests: testcases.append(ExceptionTestCase)
497 if 'c' in tests: testcases.append(CompressTestCase)
498 if 'o' in tests: testcases.append(CompressObjectTestCase)
499 test_support.run_unittest(*testcases)
501 if False:
502 import sys
503 sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test')
504 import test_zlib as tz
505 ts, ut = tz.test_support, tz.unittest
506 su = ut.TestSuite()
507 su.addTest(ut.makeSuite(tz.CompressTestCase))
508 ts.run_suite(su)