2 TestCases for exercising a Queue DB.
7 from pprint
import pprint
14 # For earlier Pythons w/distutils pybsddb
17 from test_all
import verbose
20 #----------------------------------------------------------------------
22 class SimpleQueueTestCase(unittest
.TestCase
):
24 self
.filename
= tempfile
.mktemp()
28 os
.remove(self
.filename
)
33 def test01_basic(self
):
34 # Basic Queue tests using the deprecated DBCursor.consume method.
38 print "Running %s.test01_basic..." % self
.__class
__.__name
__
41 d
.set_re_len(40) # Queues must be fixed length
42 d
.open(self
.filename
, db
.DB_QUEUE
, db
.DB_CREATE
)
45 print "before appends" + '-' * 30
48 for x
in string
.letters
:
53 d
.put(100, "some more data")
54 d
.put(101, "and some more ")
55 d
.put(75, "out of order")
56 d
.put(1, "replacement data")
61 print "before close" + '-' * 30
70 print "after open" + '-' * 30
77 print "after append" + '-' * 30
88 print "after consume loop" + '-' * 30
92 "if you see this message then you need to rebuild " \
93 "BerkeleyDB 3.1.17 with the patch in patches/qam_stat.diff"
99 def test02_basicPost32(self
):
100 # Basic Queue tests using the new DB.consume method in DB 3.2+
104 print '\n', '-=' * 30
105 print "Running %s.test02_basicPost32..." % self
.__class
__.__name
__
107 if db
.version() < (3, 2, 0):
109 print "Test not run, DB not new enough..."
113 d
.set_re_len(40) # Queues must be fixed length
114 d
.open(self
.filename
, db
.DB_QUEUE
, db
.DB_CREATE
)
117 print "before appends" + '-' * 30
120 for x
in string
.letters
:
125 d
.put(100, "some more data")
126 d
.put(101, "and some more ")
127 d
.put(75, "out of order")
128 d
.put(1, "replacement data")
133 print "before close" + '-' * 30
139 d
.open(self
.filename
)
140 #d.set_get_returns_none(true)
143 print "after open" + '-' * 30
149 print "after append" + '-' * 30
159 print "after consume loop" + '-' * 30
166 #----------------------------------------------------------------------
169 return unittest
.makeSuite(SimpleQueueTestCase
)
172 if __name__
== '__main__':
173 unittest
.main(defaultTest
='test_suite')