Fix a bug in the ``compiler`` package that caused invalid code to be
[python/dscho.git] / Lib / sqlite3 / test / types.py
blob8da5722d55faff89d40272e3840a3e2f7eadc733
1 #-*- coding: ISO-8859-1 -*-
2 # pysqlite2/test/types.py: tests for type conversion and detection
4 # Copyright (C) 2005 Gerhard Häring <gh@ghaering.de>
6 # This file is part of pysqlite.
8 # This software is provided 'as-is', without any express or implied
9 # warranty. In no event will the authors be held liable for any damages
10 # arising from the use of this software.
12 # Permission is granted to anyone to use this software for any purpose,
13 # including commercial applications, and to alter it and redistribute it
14 # freely, subject to the following restrictions:
16 # 1. The origin of this software must not be misrepresented; you must not
17 # claim that you wrote the original software. If you use this software
18 # in a product, an acknowledgment in the product documentation would be
19 # appreciated but is not required.
20 # 2. Altered source versions must be plainly marked as such, and must not be
21 # misrepresented as being the original software.
22 # 3. This notice may not be removed or altered from any source distribution.
24 import bz2, datetime
25 import unittest
26 import sqlite3 as sqlite
28 class SqliteTypeTests(unittest.TestCase):
29 def setUp(self):
30 self.con = sqlite.connect(":memory:")
31 self.cur = self.con.cursor()
32 self.cur.execute("create table test(i integer, s varchar, f number, b blob)")
34 def tearDown(self):
35 self.cur.close()
36 self.con.close()
38 def CheckString(self):
39 self.cur.execute("insert into test(s) values (?)", (u"Österreich",))
40 self.cur.execute("select s from test")
41 row = self.cur.fetchone()
42 self.failUnlessEqual(row[0], u"Österreich")
44 def CheckSmallInt(self):
45 self.cur.execute("insert into test(i) values (?)", (42,))
46 self.cur.execute("select i from test")
47 row = self.cur.fetchone()
48 self.failUnlessEqual(row[0], 42)
50 def CheckLargeInt(self):
51 num = 2**40
52 self.cur.execute("insert into test(i) values (?)", (num,))
53 self.cur.execute("select i from test")
54 row = self.cur.fetchone()
55 self.failUnlessEqual(row[0], num)
57 def CheckFloat(self):
58 val = 3.14
59 self.cur.execute("insert into test(f) values (?)", (val,))
60 self.cur.execute("select f from test")
61 row = self.cur.fetchone()
62 self.failUnlessEqual(row[0], val)
64 def CheckBlob(self):
65 val = buffer("Guglhupf")
66 self.cur.execute("insert into test(b) values (?)", (val,))
67 self.cur.execute("select b from test")
68 row = self.cur.fetchone()
69 self.failUnlessEqual(row[0], val)
71 def CheckUnicodeExecute(self):
72 self.cur.execute(u"select 'Österreich'")
73 row = self.cur.fetchone()
74 self.failUnlessEqual(row[0], u"Österreich")
76 class DeclTypesTests(unittest.TestCase):
77 class Foo:
78 def __init__(self, _val):
79 self.val = _val
81 def __cmp__(self, other):
82 if not isinstance(other, DeclTypesTests.Foo):
83 raise ValueError
84 if self.val == other.val:
85 return 0
86 else:
87 return 1
89 def __conform__(self, protocol):
90 if protocol is sqlite.PrepareProtocol:
91 return self.val
92 else:
93 return None
95 def __str__(self):
96 return "<%s>" % self.val
98 def setUp(self):
99 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
100 self.cur = self.con.cursor()
101 self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob)")
103 # override float, make them always return the same number
104 sqlite.converters["FLOAT"] = lambda x: 47.2
106 # and implement two custom ones
107 sqlite.converters["BOOL"] = lambda x: bool(int(x))
108 sqlite.converters["FOO"] = DeclTypesTests.Foo
110 def tearDown(self):
111 del sqlite.converters["FLOAT"]
112 del sqlite.converters["BOOL"]
113 del sqlite.converters["FOO"]
114 self.cur.close()
115 self.con.close()
117 def CheckString(self):
118 # default
119 self.cur.execute("insert into test(s) values (?)", ("foo",))
120 self.cur.execute("select s from test")
121 row = self.cur.fetchone()
122 self.failUnlessEqual(row[0], "foo")
124 def CheckSmallInt(self):
125 # default
126 self.cur.execute("insert into test(i) values (?)", (42,))
127 self.cur.execute("select i from test")
128 row = self.cur.fetchone()
129 self.failUnlessEqual(row[0], 42)
131 def CheckLargeInt(self):
132 # default
133 num = 2**40
134 self.cur.execute("insert into test(i) values (?)", (num,))
135 self.cur.execute("select i from test")
136 row = self.cur.fetchone()
137 self.failUnlessEqual(row[0], num)
139 def CheckFloat(self):
140 # custom
141 val = 3.14
142 self.cur.execute("insert into test(f) values (?)", (val,))
143 self.cur.execute("select f from test")
144 row = self.cur.fetchone()
145 self.failUnlessEqual(row[0], 47.2)
147 def CheckBool(self):
148 # custom
149 self.cur.execute("insert into test(b) values (?)", (False,))
150 self.cur.execute("select b from test")
151 row = self.cur.fetchone()
152 self.failUnlessEqual(row[0], False)
154 self.cur.execute("delete from test")
155 self.cur.execute("insert into test(b) values (?)", (True,))
156 self.cur.execute("select b from test")
157 row = self.cur.fetchone()
158 self.failUnlessEqual(row[0], True)
160 def CheckUnicode(self):
161 # default
162 val = u"\xd6sterreich"
163 self.cur.execute("insert into test(u) values (?)", (val,))
164 self.cur.execute("select u from test")
165 row = self.cur.fetchone()
166 self.failUnlessEqual(row[0], val)
168 def CheckFoo(self):
169 val = DeclTypesTests.Foo("bla")
170 self.cur.execute("insert into test(foo) values (?)", (val,))
171 self.cur.execute("select foo from test")
172 row = self.cur.fetchone()
173 self.failUnlessEqual(row[0], val)
175 def CheckUnsupportedSeq(self):
176 class Bar: pass
177 val = Bar()
178 try:
179 self.cur.execute("insert into test(f) values (?)", (val,))
180 self.fail("should have raised an InterfaceError")
181 except sqlite.InterfaceError:
182 pass
183 except:
184 self.fail("should have raised an InterfaceError")
186 def CheckUnsupportedDict(self):
187 class Bar: pass
188 val = Bar()
189 try:
190 self.cur.execute("insert into test(f) values (:val)", {"val": val})
191 self.fail("should have raised an InterfaceError")
192 except sqlite.InterfaceError:
193 pass
194 except:
195 self.fail("should have raised an InterfaceError")
197 def CheckBlob(self):
198 # default
199 val = buffer("Guglhupf")
200 self.cur.execute("insert into test(bin) values (?)", (val,))
201 self.cur.execute("select bin from test")
202 row = self.cur.fetchone()
203 self.failUnlessEqual(row[0], val)
205 class ColNamesTests(unittest.TestCase):
206 def setUp(self):
207 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES|sqlite.PARSE_DECLTYPES)
208 self.cur = self.con.cursor()
209 self.cur.execute("create table test(x foo)")
211 sqlite.converters["FOO"] = lambda x: "[%s]" % x
212 sqlite.converters["BAR"] = lambda x: "<%s>" % x
213 sqlite.converters["EXC"] = lambda x: 5/0
215 def tearDown(self):
216 del sqlite.converters["FOO"]
217 del sqlite.converters["BAR"]
218 del sqlite.converters["EXC"]
219 self.cur.close()
220 self.con.close()
222 def CheckDeclType(self):
223 self.cur.execute("insert into test(x) values (?)", ("xxx",))
224 self.cur.execute("select x from test")
225 val = self.cur.fetchone()[0]
226 self.failUnlessEqual(val, "[xxx]")
228 def CheckNone(self):
229 self.cur.execute("insert into test(x) values (?)", (None,))
230 self.cur.execute("select x from test")
231 val = self.cur.fetchone()[0]
232 self.failUnlessEqual(val, None)
234 def CheckColName(self):
235 self.cur.execute("insert into test(x) values (?)", ("xxx",))
236 self.cur.execute('select x as "x [bar]" from test')
237 val = self.cur.fetchone()[0]
238 self.failUnlessEqual(val, "<xxx>")
240 # Check if the stripping of colnames works. Everything after the first
241 # whitespace should be stripped.
242 self.failUnlessEqual(self.cur.description[0][0], "x")
244 def CheckCursorDescriptionNoRow(self):
246 cursor.description should at least provide the column name(s), even if
247 no row returned.
249 self.cur.execute("select * from test where 0 = 1")
250 self.assert_(self.cur.description[0][0] == "x")
252 class ObjectAdaptationTests(unittest.TestCase):
253 def cast(obj):
254 return float(obj)
255 cast = staticmethod(cast)
257 def setUp(self):
258 self.con = sqlite.connect(":memory:")
259 try:
260 del sqlite.adapters[int]
261 except:
262 pass
263 sqlite.register_adapter(int, ObjectAdaptationTests.cast)
264 self.cur = self.con.cursor()
266 def tearDown(self):
267 del sqlite.adapters[(int, sqlite.PrepareProtocol)]
268 self.cur.close()
269 self.con.close()
271 def CheckCasterIsUsed(self):
272 self.cur.execute("select ?", (4,))
273 val = self.cur.fetchone()[0]
274 self.failUnlessEqual(type(val), float)
276 class BinaryConverterTests(unittest.TestCase):
277 def convert(s):
278 return bz2.decompress(s)
279 convert = staticmethod(convert)
281 def setUp(self):
282 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
283 sqlite.register_converter("bin", BinaryConverterTests.convert)
285 def tearDown(self):
286 self.con.close()
288 def CheckBinaryInputForConverter(self):
289 testdata = "abcdefg" * 10
290 result = self.con.execute('select ? as "x [bin]"', (buffer(bz2.compress(testdata)),)).fetchone()[0]
291 self.failUnlessEqual(testdata, result)
293 class DateTimeTests(unittest.TestCase):
294 def setUp(self):
295 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
296 self.cur = self.con.cursor()
297 self.cur.execute("create table test(d date, ts timestamp)")
299 def tearDown(self):
300 self.cur.close()
301 self.con.close()
303 def CheckSqliteDate(self):
304 d = sqlite.Date(2004, 2, 14)
305 self.cur.execute("insert into test(d) values (?)", (d,))
306 self.cur.execute("select d from test")
307 d2 = self.cur.fetchone()[0]
308 self.failUnlessEqual(d, d2)
310 def CheckSqliteTimestamp(self):
311 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
312 self.cur.execute("insert into test(ts) values (?)", (ts,))
313 self.cur.execute("select ts from test")
314 ts2 = self.cur.fetchone()[0]
315 self.failUnlessEqual(ts, ts2)
317 def CheckSqlTimestamp(self):
318 # The date functions are only available in SQLite version 3.1 or later
319 if sqlite.sqlite_version_info < (3, 1):
320 return
322 # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time.
323 now = datetime.datetime.now()
324 self.cur.execute("insert into test(ts) values (current_timestamp)")
325 self.cur.execute("select ts from test")
326 ts = self.cur.fetchone()[0]
327 self.failUnlessEqual(type(ts), datetime.datetime)
328 self.failUnlessEqual(ts.year, now.year)
330 def CheckDateTimeSubSeconds(self):
331 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
332 self.cur.execute("insert into test(ts) values (?)", (ts,))
333 self.cur.execute("select ts from test")
334 ts2 = self.cur.fetchone()[0]
335 self.failUnlessEqual(ts, ts2)
337 def suite():
338 sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check")
339 decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check")
340 colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check")
341 adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check")
342 bin_suite = unittest.makeSuite(BinaryConverterTests, "Check")
343 date_suite = unittest.makeSuite(DateTimeTests, "Check")
344 return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite))
346 def test():
347 runner = unittest.TextTestRunner()
348 runner.run(suite())
350 if __name__ == "__main__":
351 test()