Move setting of ioready 'wait' earlier in call chain, to
[python/dscho.git] / Lib / bsddb / test / test_lock.py
blobaf10beab0806b6462031120253d97bd744fc11f9
1 """
2 TestCases for testing the locking sub-system.
3 """
5 import sys, os, string
6 import tempfile
7 import time
8 from pprint import pprint
9 from whrandom import random
11 try:
12 from threading import Thread, currentThread
13 have_threads = 1
14 except ImportError:
15 have_threads = 0
18 import unittest
19 from test_all import verbose
21 try:
22 # For Python 2.3
23 from bsddb import db
24 except ImportError:
25 # For earlier Pythons w/distutils pybsddb
26 from bsddb3 import db
29 #----------------------------------------------------------------------
31 class LockingTestCase(unittest.TestCase):
33 def setUp(self):
34 homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
35 self.homeDir = homeDir
36 try: os.mkdir(homeDir)
37 except os.error: pass
38 self.env = db.DBEnv()
39 self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
40 db.DB_INIT_LOCK | db.DB_CREATE)
43 def tearDown(self):
44 self.env.close()
45 import glob
46 files = glob.glob(os.path.join(self.homeDir, '*'))
47 for file in files:
48 os.remove(file)
51 def test01_simple(self):
52 if verbose:
53 print '\n', '-=' * 30
54 print "Running %s.test01_simple..." % self.__class__.__name__
56 anID = self.env.lock_id()
57 if verbose:
58 print "locker ID: %s" % anID
59 lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
60 if verbose:
61 print "Aquired lock: %s" % lock
62 time.sleep(1)
63 self.env.lock_put(lock)
64 if verbose:
65 print "Released lock: %s" % lock
70 def test02_threaded(self):
71 if verbose:
72 print '\n', '-=' * 30
73 print "Running %s.test02_threaded..." % self.__class__.__name__
75 threads = []
76 threads.append(Thread(target = self.theThread,
77 args=(5, db.DB_LOCK_WRITE)))
78 threads.append(Thread(target = self.theThread,
79 args=(1, db.DB_LOCK_READ)))
80 threads.append(Thread(target = self.theThread,
81 args=(1, db.DB_LOCK_READ)))
82 threads.append(Thread(target = self.theThread,
83 args=(1, db.DB_LOCK_WRITE)))
84 threads.append(Thread(target = self.theThread,
85 args=(1, db.DB_LOCK_READ)))
86 threads.append(Thread(target = self.theThread,
87 args=(1, db.DB_LOCK_READ)))
88 threads.append(Thread(target = self.theThread,
89 args=(1, db.DB_LOCK_WRITE)))
90 threads.append(Thread(target = self.theThread,
91 args=(1, db.DB_LOCK_WRITE)))
92 threads.append(Thread(target = self.theThread,
93 args=(1, db.DB_LOCK_WRITE)))
95 for t in threads:
96 t.start()
97 for t in threads:
98 t.join()
102 def theThread(self, sleepTime, lockType):
103 name = currentThread().getName()
104 if lockType == db.DB_LOCK_WRITE:
105 lt = "write"
106 else:
107 lt = "read"
109 anID = self.env.lock_id()
110 if verbose:
111 print "%s: locker ID: %s" % (name, anID)
113 lock = self.env.lock_get(anID, "some locked thing", lockType)
114 if verbose:
115 print "%s: Aquired %s lock: %s" % (name, lt, lock)
117 time.sleep(sleepTime)
119 self.env.lock_put(lock)
120 if verbose:
121 print "%s: Released %s lock: %s" % (name, lt, lock)
124 #----------------------------------------------------------------------
126 def test_suite():
127 suite = unittest.TestSuite()
129 if have_threads:
130 suite.addTest(unittest.makeSuite(LockingTestCase))
131 else:
132 suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
134 return suite
137 if __name__ == '__main__':
138 unittest.main(defaultTest='test_suite')