2 from test
import test_support
6 # print test_support.TESTFN
9 # This was in the original. Avoid non-repeatable sources.
10 # Left here (unused) in case something wants to be done with it.
13 t
= imp
.find_module('test_zlib')
23 class ChecksumTestCase(unittest
.TestCase
):
25 def test_crc32start(self
):
26 self
.assertEqual(zlib
.crc32(""), zlib
.crc32("", 0))
28 def test_crc32empty(self
):
29 self
.assertEqual(zlib
.crc32("", 0), 0)
30 self
.assertEqual(zlib
.crc32("", 1), 1)
31 self
.assertEqual(zlib
.crc32("", 432), 432)
33 def test_adler32start(self
):
34 self
.assertEqual(zlib
.adler32(""), zlib
.adler32("", 1))
36 def test_adler32empty(self
):
37 self
.assertEqual(zlib
.adler32("", 0), 0)
38 self
.assertEqual(zlib
.adler32("", 1), 1)
39 self
.assertEqual(zlib
.adler32("", 432), 432)
41 def assertEqual32(self
, seen
, expected
):
42 # 32-bit values masked -- checksums on 32- vs 64- bit machines
43 # This is important if bit 31 (0x08000000L) is set.
44 self
.assertEqual(seen
& 0x0FFFFFFFFL
, expected
& 0x0FFFFFFFFL
)
46 def test_penguins(self
):
47 self
.assertEqual32(zlib
.crc32("penguin", 0), 0x0e5c1a120L
)
48 self
.assertEqual32(zlib
.crc32("penguin", 1), 0x43b6aa94)
49 self
.assertEqual32(zlib
.adler32("penguin", 0), 0x0bcf02f6)
50 self
.assertEqual32(zlib
.adler32("penguin", 1), 0x0bd602f7)
52 self
.assertEqual(zlib
.crc32("penguin"), zlib
.crc32("penguin", 0))
53 self
.assertEqual(zlib
.adler32("penguin"),zlib
.adler32("penguin",1))
57 class ExceptionTestCase(unittest
.TestCase
):
58 # make sure we generate some expected errors
59 def test_bigbits(self
):
60 # specifying total bits too large causes an error
61 self
.assertRaises(zlib
.error
,
62 zlib
.compress
, 'ERROR', zlib
.MAX_WBITS
+ 1)
64 def test_badcompressobj(self
):
65 # verify failure on building compress object with bad params
66 self
.assertRaises(ValueError, zlib
.compressobj
, 1, 8, 0)
68 def test_baddecompressobj(self
):
69 # verify failure on building decompress object with bad params
70 self
.assertRaises(ValueError, zlib
.decompressobj
, 0)
74 class CompressTestCase(unittest
.TestCase
):
75 # Test compression in one go (whole message compression)
76 def test_speech(self
):
77 # decompress(compress(data)) better be data
78 x
= zlib
.compress(hamlet_scene
)
79 self
.assertEqual(zlib
.decompress(x
), hamlet_scene
)
81 def test_speech8(self
):
82 # decompress(compress(data)) better be data -- more compression chances
83 data
= hamlet_scene
* 8
84 x
= zlib
.compress(data
)
85 self
.assertEqual(zlib
.decompress(x
), data
)
87 def test_speech16(self
):
88 # decompress(compress(data)) better be data -- more compression chances
89 data
= hamlet_scene
* 16
90 x
= zlib
.compress(data
)
91 self
.assertEqual(zlib
.decompress(x
), data
)
93 def test_speech128(self
):
94 # decompress(compress(data)) better be data -- more compression chances
95 data
= hamlet_scene
* 8 * 16
96 x
= zlib
.compress(data
)
97 self
.assertEqual(zlib
.decompress(x
), data
)
102 class CompressObjectTestCase(unittest
.TestCase
):
103 # Test compression object
104 def test_pairsmall(self
):
105 # use compress object in straightforward manner, decompress w/ object
107 co
= zlib
.compressobj(8, 8, -15)
108 x1
= co
.compress(data
)
110 self
.assertRaises(zlib
.error
, co
.flush
) # second flush should not work
111 dco
= zlib
.decompressobj(-15)
112 y1
= dco
.decompress(x1
+ x2
)
114 self
.assertEqual(data
, y1
+ y2
)
117 # straightforward compress/decompress objects, more compression
118 data
= hamlet_scene
* 8 * 16
119 co
= zlib
.compressobj(8, 8, -15)
120 x1
= co
.compress(data
)
122 self
.assertRaises(zlib
.error
, co
.flush
) # second flush should not work
123 dco
= zlib
.decompressobj(-15)
124 y1
= dco
.decompress(x1
+ x2
)
126 self
.assertEqual(data
, y1
+ y2
)
128 def test_compressincremental(self
):
129 # compress object in steps, decompress object as one-shot
130 data
= hamlet_scene
* 8 * 16
131 co
= zlib
.compressobj(2, 8, -12, 9, 1)
133 for i
in range(0, len(data
), 256):
134 bufs
.append(co
.compress(data
[i
:i
+256]))
135 bufs
.append(co
.flush())
136 combuf
= ''.join(bufs
)
138 dco
= zlib
.decompressobj(-15)
139 y1
= dco
.decompress(''.join(bufs
))
141 self
.assertEqual(data
, y1
+ y2
)
143 def test_decompressincremental(self
):
144 # compress object in steps, decompress object in steps
145 data
= hamlet_scene
* 8 * 16
146 co
= zlib
.compressobj(2, 8, -12, 9, 1)
148 for i
in range(0, len(data
), 256):
149 bufs
.append(co
.compress(data
[i
:i
+256]))
150 bufs
.append(co
.flush())
151 combuf
= ''.join(bufs
)
153 self
.assertEqual(data
, zlib
.decompress(combuf
, -12, -5))
155 dco
= zlib
.decompressobj(-12)
157 for i
in range(0, len(combuf
), 128):
158 bufs
.append(dco
.decompress(combuf
[i
:i
+128]))
159 self
.assertEqual('', dco
.unconsumed_tail
, ########
160 "(A) uct should be '': not %d long" %
161 len(dco
.unconsumed_tail
))
162 bufs
.append(dco
.flush())
163 self
.assertEqual('', dco
.unconsumed_tail
, ########
164 "(B) uct should be '': not %d long" %
165 len(dco
.unconsumed_tail
))
166 self
.assertEqual(data
, ''.join(bufs
))
167 # Failure means: "decompressobj with init options failed"
169 def test_decompinc(self
,sizes
=[128],flush
=True,source
=None,cx
=256,dcx
=64):
170 # compress object in steps, decompress object in steps, loop sizes
171 source
= source
or hamlet_scene
174 co
= zlib
.compressobj(2, 8, -12, 9, 1)
176 for i
in range(0, len(data
), cx
):
177 bufs
.append(co
.compress(data
[i
:i
+cx
]))
178 bufs
.append(co
.flush())
179 combuf
= ''.join(bufs
)
181 self
.assertEqual(data
, zlib
.decompress(combuf
, -12, -5))
183 dco
= zlib
.decompressobj(-12)
185 for i
in range(0, len(combuf
), dcx
):
186 bufs
.append(dco
.decompress(combuf
[i
:i
+dcx
]))
187 self
.assertEqual('', dco
.unconsumed_tail
, ########
188 "(A) uct should be '': not %d long" %
189 len(dco
.unconsumed_tail
))
191 bufs
.append(dco
.flush())
194 chunk
= dco
.decompress('')
199 self
.assertEqual('', dco
.unconsumed_tail
, ########
200 "(B) uct should be '': not %d long" %
201 len(dco
.unconsumed_tail
))
202 self
.assertEqual(data
, ''.join(bufs
))
203 # Failure means: "decompressobj with init options failed"
205 def test_decompimax(self
,sizes
=[128],flush
=True,source
=None,cx
=256,dcx
=64):
206 # compress in steps, decompress in length-restricted steps, loop sizes
207 source
= source
or hamlet_scene
209 # Check a decompression object with max_length specified
211 co
= zlib
.compressobj(2, 8, -12, 9, 1)
213 for i
in range(0, len(data
), cx
):
214 bufs
.append(co
.compress(data
[i
:i
+cx
]))
215 bufs
.append(co
.flush())
216 combuf
= ''.join(bufs
)
217 self
.assertEqual(data
, zlib
.decompress(combuf
, -12, -5),
218 'compressed data failure')
220 dco
= zlib
.decompressobj(-12)
224 #max_length = 1 + len(cb)//10
225 chunk
= dco
.decompress(cb
, dcx
)
226 self
.failIf(len(chunk
) > dcx
,
227 'chunk too big (%d>%d)' % (len(chunk
), dcx
))
229 cb
= dco
.unconsumed_tail
231 bufs
.append(dco
.flush())
234 chunk
= dco
.decompress('', dcx
)
235 self
.failIf(len(chunk
) > dcx
,
236 'chunk too big in tail (%d>%d)' % (len(chunk
), dcx
))
241 self
.assertEqual(len(data
), len(''.join(bufs
)))
242 self
.assertEqual(data
, ''.join(bufs
), 'Wrong data retrieved')
244 def test_decompressmaxlen(self
):
245 # Check a decompression object with max_length specified
246 data
= hamlet_scene
* 8 * 16
247 co
= zlib
.compressobj(2, 8, -12, 9, 1)
249 for i
in range(0, len(data
), 256):
250 bufs
.append(co
.compress(data
[i
:i
+256]))
251 bufs
.append(co
.flush())
252 combuf
= ''.join(bufs
)
253 self
.assertEqual(data
, zlib
.decompress(combuf
, -12, -5),
254 'compressed data failure')
256 dco
= zlib
.decompressobj(-12)
260 max_length
= 1 + len(cb
)//10
261 chunk
= dco
.decompress(cb
, max_length
)
262 self
.failIf(len(chunk
) > max_length
,
263 'chunk too big (%d>%d)' % (len(chunk
),max_length
))
265 cb
= dco
.unconsumed_tail
266 bufs
.append(dco
.flush())
267 self
.assertEqual(len(data
), len(''.join(bufs
)))
268 self
.assertEqual(data
, ''.join(bufs
), 'Wrong data retrieved')
270 def test_decompressmaxlenflushless(self
):
271 # identical to test_decompressmaxlen except flush is replaced
272 # with an equivalent. This works and other fails on (eg) 2.2.2
273 data
= hamlet_scene
* 8 * 16
274 co
= zlib
.compressobj(2, 8, -12, 9, 1)
276 for i
in range(0, len(data
), 256):
277 bufs
.append(co
.compress(data
[i
:i
+256]))
278 bufs
.append(co
.flush())
279 combuf
= ''.join(bufs
)
280 self
.assertEqual(data
, zlib
.decompress(combuf
, -12, -5),
281 'compressed data mismatch')
283 dco
= zlib
.decompressobj(-12)
287 max_length
= 1 + len(cb
)//10
288 chunk
= dco
.decompress(cb
, max_length
)
289 self
.failIf(len(chunk
) > max_length
,
290 'chunk too big (%d>%d)' % (len(chunk
),max_length
))
292 cb
= dco
.unconsumed_tail
294 #bufs.append(dco.flush())
296 chunk
= dco
.decompress('', max_length
)
297 self
.failIf(len(chunk
) > max_length
,
298 'chunk too big (%d>%d)' % (len(chunk
),max_length
))
301 self
.assertEqual(data
, ''.join(bufs
), 'Wrong data retrieved')
303 def test_maxlenmisc(self
):
304 # Misc tests of max_length
305 dco
= zlib
.decompressobj(-12)
306 self
.assertRaises(ValueError, dco
.decompress
, "", -1)
307 self
.assertEqual('', dco
.unconsumed_tail
)
309 def test_flushes(self
):
310 # Test flush() with the various options, using all the
311 # different levels in order to provide more variations.
312 sync_opt
= ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
313 sync_opt
= [getattr(zlib
, opt
) for opt
in sync_opt
314 if hasattr(zlib
, opt
)]
315 data
= hamlet_scene
* 8
317 for sync
in sync_opt
:
318 for level
in range(10):
319 obj
= zlib
.compressobj( level
)
320 a
= obj
.compress( data
[:3000] )
321 b
= obj
.flush( sync
)
322 c
= obj
.compress( data
[3000:] )
324 self
.assertEqual(zlib
.decompress(''.join([a
,b
,c
,d
])),
325 data
, ("Decompress failed: flush "
326 "mode=%i, level=%i") % (sync
, level
))
329 def test_odd_flush(self
):
330 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
333 if hasattr(zlib
, 'Z_SYNC_FLUSH'):
334 # Testing on 17K of "random" data
336 # Create compressor and decompressor objects
337 co
= zlib
.compressobj(9)
338 dco
= zlib
.decompressobj()
341 # generate random data stream
343 # In 2.3 and later, WichmannHill is the RNG of the bug report
344 gen
= random
.WichmannHill()
345 except AttributeError:
347 # 2.2 called it Random
348 gen
= random
.Random()
349 except AttributeError:
350 # others might simply have a single RNG
353 data
= genblock(1, 17 * 1024, generator
=gen
)
355 # compress, sync-flush, and decompress
356 first
= co
.compress(data
)
357 second
= co
.flush(zlib
.Z_SYNC_FLUSH
)
358 expanded
= dco
.decompress(first
+ second
)
360 # if decompressed data is different from the input data, choke.
361 self
.assertEqual(expanded
, data
, "17K random source doesn't match")
363 def test_manydecompinc(self
):
364 # Run incremental decompress test for a large range of sizes
365 self
.test_decompinc(sizes
=[1<<n
for n
in range(8)],
366 flush
=True, cx
=32, dcx
=4)
368 def test_manydecompimax(self
):
369 # Run incremental decompress maxlen test for a large range of sizes
370 # avoid the flush bug
371 self
.test_decompimax(sizes
=[1<<n
for n
in range(8)],
372 flush
=False, cx
=32, dcx
=4)
374 def test_manydecompimaxflush(self
):
375 # Run incremental decompress maxlen test for a large range of sizes
376 # avoid the flush bug
377 self
.test_decompimax(sizes
=[1<<n
for n
in range(8)],
378 flush
=True, cx
=32, dcx
=4)
381 def genblock(seed
, length
, step
=1024, generator
=random
):
382 """length-byte stream of random data from a seed (in step-byte blocks)."""
385 randint
= generator
.randint
386 if length
< step
or step
< 2:
389 for i
in range(0, length
, step
):
390 blocks
.append(''.join([chr(randint(0,255))
391 for x
in range(step
)]))
392 return ''.join(blocks
)[:length
]
396 def choose_lines(source
, number
, seed
=None, generator
=random
):
397 """Return a list of number lines randomly chosen from the source"""
400 sources
= source
.split('\n')
401 return [generator
.choice(sources
) for n
in range(number
)]
409 I stay too long: but here my father comes.
413 A double blessing is a double grace,
414 Occasion smiles upon a second leave.
418 Yet here, Laertes! aboard, aboard, for shame!
419 The wind sits in the shoulder of your sail,
420 And you are stay'd for. There; my blessing with thee!
421 And these few precepts in thy memory
422 See thou character. Give thy thoughts no tongue,
423 Nor any unproportioned thought his act.
424 Be thou familiar, but by no means vulgar.
425 Those friends thou hast, and their adoption tried,
426 Grapple them to thy soul with hoops of steel;
427 But do not dull thy palm with entertainment
428 Of each new-hatch'd, unfledged comrade. Beware
429 Of entrance to a quarrel, but being in,
430 Bear't that the opposed may beware of thee.
431 Give every man thy ear, but few thy voice;
432 Take each man's censure, but reserve thy judgment.
433 Costly thy habit as thy purse can buy,
434 But not express'd in fancy; rich, not gaudy;
435 For the apparel oft proclaims the man,
436 And they in France of the best rank and station
437 Are of a most select and generous chief in that.
438 Neither a borrower nor a lender be;
439 For loan oft loses both itself and friend,
440 And borrowing dulls the edge of husbandry.
441 This above all: to thine ownself be true,
442 And it must follow, as the night the day,
443 Thou canst not then be false to any man.
444 Farewell: my blessing season this in thee!
448 Most humbly do I take my leave, my lord.
452 The time invites you; go; your servants tend.
456 Farewell, Ophelia; and remember well
457 What I have said to you.
461 'Tis in my memory lock'd,
462 And you yourself shall keep the key of it.
471 test_support
.run_unittest(
475 CompressObjectTestCase
478 if __name__
== "__main__":
482 if not tests
: tests
= 'o'
484 if 'k' in tests
: testcases
.append(ChecksumTestCase
)
485 if 'x' in tests
: testcases
.append(ExceptionTestCase
)
486 if 'c' in tests
: testcases
.append(CompressTestCase
)
487 if 'o' in tests
: testcases
.append(CompressObjectTestCase
)
488 test_support
.run_unittest(*testcases
)
492 sys
.path
.insert(1, '/Py23Src/python/dist/src/Lib/test')
493 import test_zlib
as tz
494 ts
, ut
= tz
.test_support
, tz
.unittest
496 su
.addTest(ut
.makeSuite(tz
.CompressTestCase
))