1 #-*- coding: ISO-8859-1 -*-
2 # pysqlite2/test/types.py: tests for type conversion and detection
4 # Copyright (C) 2005 Gerhard Häring <gh@ghaering.de>
6 # This file is part of pysqlite.
8 # This software is provided 'as-is', without any express or implied
9 # warranty. In no event will the authors be held liable for any damages
10 # arising from the use of this software.
12 # Permission is granted to anyone to use this software for any purpose,
13 # including commercial applications, and to alter it and redistribute it
14 # freely, subject to the following restrictions:
16 # 1. The origin of this software must not be misrepresented; you must not
17 # claim that you wrote the original software. If you use this software
18 # in a product, an acknowledgment in the product documentation would be
19 # appreciated but is not required.
20 # 2. Altered source versions must be plainly marked as such, and must not be
21 # misrepresented as being the original software.
22 # 3. This notice may not be removed or altered from any source distribution.
26 import sqlite3
as sqlite
28 class SqliteTypeTests(unittest
.TestCase
):
30 self
.con
= sqlite
.connect(":memory:")
31 self
.cur
= self
.con
.cursor()
32 self
.cur
.execute("create table test(i integer, s varchar, f number, b blob)")
38 def CheckString(self
):
39 self
.cur
.execute("insert into test(s) values (?)", (u
"Österreich",))
40 self
.cur
.execute("select s from test")
41 row
= self
.cur
.fetchone()
42 self
.failUnlessEqual(row
[0], u
"Österreich")
44 def CheckSmallInt(self
):
45 self
.cur
.execute("insert into test(i) values (?)", (42,))
46 self
.cur
.execute("select i from test")
47 row
= self
.cur
.fetchone()
48 self
.failUnlessEqual(row
[0], 42)
50 def CheckLargeInt(self
):
52 self
.cur
.execute("insert into test(i) values (?)", (num
,))
53 self
.cur
.execute("select i from test")
54 row
= self
.cur
.fetchone()
55 self
.failUnlessEqual(row
[0], num
)
59 self
.cur
.execute("insert into test(f) values (?)", (val
,))
60 self
.cur
.execute("select f from test")
61 row
= self
.cur
.fetchone()
62 self
.failUnlessEqual(row
[0], val
)
65 val
= buffer("Guglhupf")
66 self
.cur
.execute("insert into test(b) values (?)", (val
,))
67 self
.cur
.execute("select b from test")
68 row
= self
.cur
.fetchone()
69 self
.failUnlessEqual(row
[0], val
)
71 def CheckUnicodeExecute(self
):
72 self
.cur
.execute(u
"select 'Österreich'")
73 row
= self
.cur
.fetchone()
74 self
.failUnlessEqual(row
[0], u
"Österreich")
76 class DeclTypesTests(unittest
.TestCase
):
78 def __init__(self
, _val
):
81 def __cmp__(self
, other
):
82 if not isinstance(other
, DeclTypesTests
.Foo
):
84 if self
.val
== other
.val
:
89 def __conform__(self
, protocol
):
90 if protocol
is sqlite
.PrepareProtocol
:
96 return "<%s>" % self
.val
99 self
.con
= sqlite
.connect(":memory:", detect_types
=sqlite
.PARSE_DECLTYPES
)
100 self
.cur
= self
.con
.cursor()
101 self
.cur
.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob)")
103 # override float, make them always return the same number
104 sqlite
.converters
["FLOAT"] = lambda x
: 47.2
106 # and implement two custom ones
107 sqlite
.converters
["BOOL"] = lambda x
: bool(int(x
))
108 sqlite
.converters
["FOO"] = DeclTypesTests
.Foo
111 del sqlite
.converters
["FLOAT"]
112 del sqlite
.converters
["BOOL"]
113 del sqlite
.converters
["FOO"]
117 def CheckString(self
):
119 self
.cur
.execute("insert into test(s) values (?)", ("foo",))
120 self
.cur
.execute("select s from test")
121 row
= self
.cur
.fetchone()
122 self
.failUnlessEqual(row
[0], "foo")
124 def CheckSmallInt(self
):
126 self
.cur
.execute("insert into test(i) values (?)", (42,))
127 self
.cur
.execute("select i from test")
128 row
= self
.cur
.fetchone()
129 self
.failUnlessEqual(row
[0], 42)
131 def CheckLargeInt(self
):
134 self
.cur
.execute("insert into test(i) values (?)", (num
,))
135 self
.cur
.execute("select i from test")
136 row
= self
.cur
.fetchone()
137 self
.failUnlessEqual(row
[0], num
)
139 def CheckFloat(self
):
142 self
.cur
.execute("insert into test(f) values (?)", (val
,))
143 self
.cur
.execute("select f from test")
144 row
= self
.cur
.fetchone()
145 self
.failUnlessEqual(row
[0], 47.2)
149 self
.cur
.execute("insert into test(b) values (?)", (False,))
150 self
.cur
.execute("select b from test")
151 row
= self
.cur
.fetchone()
152 self
.failUnlessEqual(row
[0], False)
154 self
.cur
.execute("delete from test")
155 self
.cur
.execute("insert into test(b) values (?)", (True,))
156 self
.cur
.execute("select b from test")
157 row
= self
.cur
.fetchone()
158 self
.failUnlessEqual(row
[0], True)
160 def CheckUnicode(self
):
162 val
= u
"\xd6sterreich"
163 self
.cur
.execute("insert into test(u) values (?)", (val
,))
164 self
.cur
.execute("select u from test")
165 row
= self
.cur
.fetchone()
166 self
.failUnlessEqual(row
[0], val
)
169 val
= DeclTypesTests
.Foo("bla")
170 self
.cur
.execute("insert into test(foo) values (?)", (val
,))
171 self
.cur
.execute("select foo from test")
172 row
= self
.cur
.fetchone()
173 self
.failUnlessEqual(row
[0], val
)
175 def CheckUnsupportedSeq(self
):
179 self
.cur
.execute("insert into test(f) values (?)", (val
,))
180 self
.fail("should have raised an InterfaceError")
181 except sqlite
.InterfaceError
:
184 self
.fail("should have raised an InterfaceError")
186 def CheckUnsupportedDict(self
):
190 self
.cur
.execute("insert into test(f) values (:val)", {"val": val
})
191 self
.fail("should have raised an InterfaceError")
192 except sqlite
.InterfaceError
:
195 self
.fail("should have raised an InterfaceError")
199 val
= buffer("Guglhupf")
200 self
.cur
.execute("insert into test(bin) values (?)", (val
,))
201 self
.cur
.execute("select bin from test")
202 row
= self
.cur
.fetchone()
203 self
.failUnlessEqual(row
[0], val
)
205 class ColNamesTests(unittest
.TestCase
):
207 self
.con
= sqlite
.connect(":memory:", detect_types
=sqlite
.PARSE_COLNAMES|sqlite
.PARSE_DECLTYPES
)
208 self
.cur
= self
.con
.cursor()
209 self
.cur
.execute("create table test(x foo)")
211 sqlite
.converters
["FOO"] = lambda x
: "[%s]" % x
212 sqlite
.converters
["BAR"] = lambda x
: "<%s>" % x
213 sqlite
.converters
["EXC"] = lambda x
: 5/0
216 del sqlite
.converters
["FOO"]
217 del sqlite
.converters
["BAR"]
218 del sqlite
.converters
["EXC"]
222 def CheckDeclType(self
):
223 self
.cur
.execute("insert into test(x) values (?)", ("xxx",))
224 self
.cur
.execute("select x from test")
225 val
= self
.cur
.fetchone()[0]
226 self
.failUnlessEqual(val
, "[xxx]")
229 self
.cur
.execute("insert into test(x) values (?)", (None,))
230 self
.cur
.execute("select x from test")
231 val
= self
.cur
.fetchone()[0]
232 self
.failUnlessEqual(val
, None)
234 def CheckColName(self
):
235 self
.cur
.execute("insert into test(x) values (?)", ("xxx",))
236 self
.cur
.execute('select x as "x [bar]" from test')
237 val
= self
.cur
.fetchone()[0]
238 self
.failUnlessEqual(val
, "<xxx>")
240 # Check if the stripping of colnames works. Everything after the first
241 # whitespace should be stripped.
242 self
.failUnlessEqual(self
.cur
.description
[0][0], "x")
244 def CheckCursorDescriptionNoRow(self
):
246 cursor.description should at least provide the column name(s), even if
249 self
.cur
.execute("select * from test where 0 = 1")
250 self
.assert_(self
.cur
.description
[0][0] == "x")
252 class ObjectAdaptationTests(unittest
.TestCase
):
255 cast
= staticmethod(cast
)
258 self
.con
= sqlite
.connect(":memory:")
260 del sqlite
.adapters
[int]
263 sqlite
.register_adapter(int, ObjectAdaptationTests
.cast
)
264 self
.cur
= self
.con
.cursor()
267 del sqlite
.adapters
[(int, sqlite
.PrepareProtocol
)]
271 def CheckCasterIsUsed(self
):
272 self
.cur
.execute("select ?", (4,))
273 val
= self
.cur
.fetchone()[0]
274 self
.failUnlessEqual(type(val
), float)
276 class BinaryConverterTests(unittest
.TestCase
):
278 return bz2
.decompress(s
)
279 convert
= staticmethod(convert
)
282 self
.con
= sqlite
.connect(":memory:", detect_types
=sqlite
.PARSE_COLNAMES
)
283 sqlite
.register_converter("bin", BinaryConverterTests
.convert
)
288 def CheckBinaryInputForConverter(self
):
289 testdata
= "abcdefg" * 10
290 result
= self
.con
.execute('select ? as "x [bin]"', (buffer(bz2
.compress(testdata
)),)).fetchone()[0]
291 self
.failUnlessEqual(testdata
, result
)
293 class DateTimeTests(unittest
.TestCase
):
295 self
.con
= sqlite
.connect(":memory:", detect_types
=sqlite
.PARSE_DECLTYPES
)
296 self
.cur
= self
.con
.cursor()
297 self
.cur
.execute("create table test(d date, ts timestamp)")
303 def CheckSqliteDate(self
):
304 d
= sqlite
.Date(2004, 2, 14)
305 self
.cur
.execute("insert into test(d) values (?)", (d
,))
306 self
.cur
.execute("select d from test")
307 d2
= self
.cur
.fetchone()[0]
308 self
.failUnlessEqual(d
, d2
)
310 def CheckSqliteTimestamp(self
):
311 ts
= sqlite
.Timestamp(2004, 2, 14, 7, 15, 0)
312 self
.cur
.execute("insert into test(ts) values (?)", (ts
,))
313 self
.cur
.execute("select ts from test")
314 ts2
= self
.cur
.fetchone()[0]
315 self
.failUnlessEqual(ts
, ts2
)
317 def CheckSqlTimestamp(self
):
318 # The date functions are only available in SQLite version 3.1 or later
319 if sqlite
.sqlite_version_info
< (3, 1):
322 # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time.
323 now
= datetime
.datetime
.now()
324 self
.cur
.execute("insert into test(ts) values (current_timestamp)")
325 self
.cur
.execute("select ts from test")
326 ts
= self
.cur
.fetchone()[0]
327 self
.failUnlessEqual(type(ts
), datetime
.datetime
)
328 self
.failUnlessEqual(ts
.year
, now
.year
)
330 def CheckDateTimeSubSeconds(self
):
331 ts
= sqlite
.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
332 self
.cur
.execute("insert into test(ts) values (?)", (ts
,))
333 self
.cur
.execute("select ts from test")
334 ts2
= self
.cur
.fetchone()[0]
335 self
.failUnlessEqual(ts
, ts2
)
338 sqlite_type_suite
= unittest
.makeSuite(SqliteTypeTests
, "Check")
339 decltypes_type_suite
= unittest
.makeSuite(DeclTypesTests
, "Check")
340 colnames_type_suite
= unittest
.makeSuite(ColNamesTests
, "Check")
341 adaptation_suite
= unittest
.makeSuite(ObjectAdaptationTests
, "Check")
342 bin_suite
= unittest
.makeSuite(BinaryConverterTests
, "Check")
343 date_suite
= unittest
.makeSuite(DateTimeTests
, "Check")
344 return unittest
.TestSuite((sqlite_type_suite
, decltypes_type_suite
, colnames_type_suite
, adaptation_suite
, bin_suite
, date_suite
))
347 runner
= unittest
.TextTestRunner()
350 if __name__
== "__main__":