move sections
[python/dscho.git] / Lib / bsddb / test / test_recno.py
blobf86a28bbce05d89d13fdd3f453aa352d1d61c865
1 """TestCases for exercising a Recno DB.
2 """
4 import os, sys
5 import errno
6 from pprint import pprint
7 import unittest
9 from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path
11 letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
14 #----------------------------------------------------------------------
16 class SimpleRecnoTestCase(unittest.TestCase):
17 if sys.version_info < (2, 4) :
18 def assertFalse(self, expr, msg=None) :
19 return self.failIf(expr,msg=msg)
20 def assertTrue(self, expr, msg=None) :
21 return self.assert_(expr, msg=msg)
23 if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and
24 (sys.version_info < (3, 2))) :
25 def assertIsInstance(self, obj, datatype, msg=None) :
26 return self.assertEqual(type(obj), datatype, msg=msg)
27 def assertGreaterEqual(self, a, b, msg=None) :
28 return self.assertTrue(a>=b, msg=msg)
31 def setUp(self):
32 self.filename = get_new_database_path()
33 self.homeDir = None
35 def tearDown(self):
36 test_support.unlink(self.filename)
37 if self.homeDir:
38 test_support.rmtree(self.homeDir)
40 def test01_basic(self):
41 d = db.DB()
43 get_returns_none = d.set_get_returns_none(2)
44 d.set_get_returns_none(get_returns_none)
46 d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
48 for x in letters:
49 recno = d.append(x * 60)
50 self.assertIsInstance(recno, int)
51 self.assertGreaterEqual(recno, 1)
52 if verbose:
53 print recno,
55 if verbose: print
57 stat = d.stat()
58 if verbose:
59 pprint(stat)
61 for recno in range(1, len(d)+1):
62 data = d[recno]
63 if verbose:
64 print data
66 self.assertIsInstance(data, str)
67 self.assertEqual(data, d.get(recno))
69 try:
70 data = d[0] # This should raise a KeyError!?!?!
71 except db.DBInvalidArgError, val:
72 if sys.version_info < (2, 6) :
73 self.assertEqual(val[0], db.EINVAL)
74 else :
75 self.assertEqual(val.args[0], db.EINVAL)
76 if verbose: print val
77 else:
78 self.fail("expected exception")
80 # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2)
81 try:
82 d.has_key(0)
83 except db.DBError, val:
84 pass
85 else:
86 self.fail("has_key did not raise a proper exception")
88 try:
89 data = d[100]
90 except KeyError:
91 pass
92 else:
93 self.fail("expected exception")
95 try:
96 data = d.get(100)
97 except db.DBNotFoundError, val:
98 if get_returns_none:
99 self.fail("unexpected exception")
100 else:
101 self.assertEqual(data, None)
103 keys = d.keys()
104 if verbose:
105 print keys
106 self.assertIsInstance(keys, list)
107 self.assertIsInstance(keys[0], int)
108 self.assertEqual(len(keys), len(d))
110 items = d.items()
111 if verbose:
112 pprint(items)
113 self.assertIsInstance(items, list)
114 self.assertIsInstance(items[0], tuple)
115 self.assertEqual(len(items[0]), 2)
116 self.assertIsInstance(items[0][0], int)
117 self.assertIsInstance(items[0][1], str)
118 self.assertEqual(len(items), len(d))
120 self.assertTrue(d.has_key(25))
122 del d[25]
123 self.assertFalse(d.has_key(25))
125 d.delete(13)
126 self.assertFalse(d.has_key(13))
128 data = d.get_both(26, "z" * 60)
129 self.assertEqual(data, "z" * 60, 'was %r' % data)
130 if verbose:
131 print data
133 fd = d.fd()
134 if verbose:
135 print fd
137 c = d.cursor()
138 rec = c.first()
139 while rec:
140 if verbose:
141 print rec
142 rec = c.next()
144 c.set(50)
145 rec = c.current()
146 if verbose:
147 print rec
149 c.put(-1, "a replacement record", db.DB_CURRENT)
151 c.set(50)
152 rec = c.current()
153 self.assertEqual(rec, (50, "a replacement record"))
154 if verbose:
155 print rec
157 rec = c.set_range(30)
158 if verbose:
159 print rec
161 # test that non-existent key lookups work (and that
162 # DBC_set_range doesn't have a memleak under valgrind)
163 rec = c.set_range(999999)
164 self.assertEqual(rec, None)
165 if verbose:
166 print rec
168 c.close()
169 d.close()
171 d = db.DB()
172 d.open(self.filename)
173 c = d.cursor()
175 # put a record beyond the consecutive end of the recno's
176 d[100] = "way out there"
177 self.assertEqual(d[100], "way out there")
179 try:
180 data = d[99]
181 except KeyError:
182 pass
183 else:
184 self.fail("expected exception")
186 try:
187 d.get(99)
188 except db.DBKeyEmptyError, val:
189 if get_returns_none:
190 self.fail("unexpected DBKeyEmptyError exception")
191 else:
192 if sys.version_info < (2, 6) :
193 self.assertEqual(val[0], db.DB_KEYEMPTY)
194 else :
195 self.assertEqual(val.args[0], db.DB_KEYEMPTY)
196 if verbose: print val
197 else:
198 if not get_returns_none:
199 self.fail("expected exception")
201 rec = c.set(40)
202 while rec:
203 if verbose:
204 print rec
205 rec = c.next()
207 c.close()
208 d.close()
210 def test02_WithSource(self):
212 A Recno file that is given a "backing source file" is essentially a
213 simple ASCII file. Normally each record is delimited by \n and so is
214 just a line in the file, but you can set a different record delimiter
215 if needed.
217 homeDir = get_new_environment_path()
218 self.homeDir = homeDir
219 source = os.path.join(homeDir, 'test_recno.txt')
220 if not os.path.isdir(homeDir):
221 os.mkdir(homeDir)
222 f = open(source, 'w') # create the file
223 f.close()
225 d = db.DB()
226 # This is the default value, just checking if both int
227 d.set_re_delim(0x0A)
228 d.set_re_delim('\n') # and char can be used...
229 d.set_re_source(source)
230 d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
232 data = "The quick brown fox jumped over the lazy dog".split()
233 for datum in data:
234 d.append(datum)
235 d.sync()
236 d.close()
238 # get the text from the backing source
239 text = open(source, 'r').read()
240 text = text.strip()
241 if verbose:
242 print text
243 print data
244 print text.split('\n')
246 self.assertEqual(text.split('\n'), data)
248 # open as a DB again
249 d = db.DB()
250 d.set_re_source(source)
251 d.open(self.filename, db.DB_RECNO)
253 d[3] = 'reddish-brown'
254 d[8] = 'comatose'
256 d.sync()
257 d.close()
259 text = open(source, 'r').read()
260 text = text.strip()
261 if verbose:
262 print text
263 print text.split('\n')
265 self.assertEqual(text.split('\n'),
266 "The quick reddish-brown fox jumped over the comatose dog".split())
268 def test03_FixedLength(self):
269 d = db.DB()
270 d.set_re_len(40) # fixed length records, 40 bytes long
271 d.set_re_pad('-') # sets the pad character...
272 d.set_re_pad(45) # ...test both int and char
273 d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
275 for x in letters:
276 d.append(x * 35) # These will be padded
278 d.append('.' * 40) # this one will be exact
280 try: # this one will fail
281 d.append('bad' * 20)
282 except db.DBInvalidArgError, val:
283 if sys.version_info < (2, 6) :
284 self.assertEqual(val[0], db.EINVAL)
285 else :
286 self.assertEqual(val.args[0], db.EINVAL)
287 if verbose: print val
288 else:
289 self.fail("expected exception")
291 c = d.cursor()
292 rec = c.first()
293 while rec:
294 if verbose:
295 print rec
296 rec = c.next()
298 c.close()
299 d.close()
302 #----------------------------------------------------------------------
305 def test_suite():
306 return unittest.makeSuite(SimpleRecnoTestCase)
309 if __name__ == '__main__':
310 unittest.main(defaultTest='test_suite')