for git v1.5.2 (and below): chdir to the directory of the target file before executin...
[translate_toolkit.git] / search / indexer / test_indexers.py
blob2c55d8c96c7bc7f85db480254779de5c6b33215d
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # Copyright 2008 Zuza Software Foundation
5 #
6 # This file is part of translate.
8 # translate is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # translate is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with translate; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 import __init__ as indexer
25 import CommonIndexer
26 import os
27 import sys
28 import shutil
30 DATABASE = "tmp-index"
32 # overwrite this value to change the preferred indexing engine
33 default_engine = ""
35 # order of tests to be done
36 ORDER_OF_TESTS = [ "XapianIndexer", "PyLuceneIndexer", "PyLuceneIndexer1" ]
39 def _get_indexer(location):
40 """wrapper around "indexer.get_indexer" to enable a globally preferred
41 indexing engine selection
43 create an indexer based on the preference order 'default_engine'
45 @param location: the path of the database to be created/opened
46 @type location: str
47 @return: the resulting indexing engine instance
48 @rtype: CommonIndexer.CommonDatabase
49 """
50 return indexer.get_indexer(location, [default_engine])
52 def clean_database():
53 """remove an existing database"""
54 dbase_dir = os.path.abspath(DATABASE)
55 # the database directory does not exist
56 if not os.path.exists(dbase_dir):
57 return
58 # recursively remove the directory
59 shutil.rmtree(dbase_dir)
61 def create_example_content(database):
62 """add some defined documents to the database
64 this may be used to check some specific queries
66 @param database: a indexing database object
67 @type database: CommonIndexer.CommonDatabase
68 """
69 # a reasonable foo-bar entry
70 database.index_document(["foo", "bar", "med"])
71 # and something more for another document
72 database.index_document(["foo", "bar", "HELO"])
73 # another similar one - but with "barr" instead of "bar"
74 database.index_document(["foo", "barr", "med", "HELO"])
75 # some field indexed document data
76 database.index_document({"fname1": "foo_field1", "fname2": "foo_field2"})
77 database.index_document({"fname1": "bar_field1", "fname2": "foo_field2",
78 None: ["HELO", "foo"]})
79 database.index_document({ None: "med" })
80 # for tokenizing tests
81 database.set_field_analyzers({
82 "fname1": database.ANALYZER_PARTIAL | database.ANALYZER_TOKENIZE,
83 "fname2": database.ANALYZER_EXACT})
84 database.index_document({"fname1": "qaz wsx", None: "edc rfv"})
85 database.index_document({"fname2": "qaz wsx", None: "edc rfv"})
86 # check a filename with the exact analyzer
87 database.index_document({"fname2": "foo-bar.po"})
88 assert _get_number_of_docs(database) == 9
90 def test_create_database():
91 """create a new database from scratch"""
92 # clean up everything first
93 clean_database()
94 new_db = _get_indexer(DATABASE)
95 assert isinstance(new_db, CommonIndexer.CommonDatabase)
96 assert os.path.exists(DATABASE)
97 # clean up
98 clean_database()
100 def test_open_database():
101 """open an existing database"""
102 # clean up everything first
103 clean_database()
104 # create a new database - it will be closed immediately afterwards
105 # since the reference is lost again
106 _get_indexer(DATABASE)
107 # open the existing database again
108 opened_db = _get_indexer(DATABASE)
109 assert isinstance(opened_db, CommonIndexer.CommonDatabase)
110 # clean up
111 clean_database()
113 def test_make_queries():
114 """create a simple query from a plain string"""
115 # clean up everything first
116 clean_database()
117 # initialize the database with example content
118 new_db = _get_indexer(DATABASE)
119 create_example_content(new_db)
120 # plaintext queries
121 q_plain1 = new_db.make_query("foo")
122 q_plain2 = new_db.make_query("foo bar")
123 assert str(q_plain1) != str(q_plain2)
124 # list 'and/or'
125 q_combined_and = new_db.make_query([new_db.make_query("foo"),
126 new_db.make_query("bar")])
127 q_combined_or = new_db.make_query([new_db.make_query("foo"),
128 new_db.make_query("bar")], require_all=False)
129 assert str(q_combined_or) != str(q_combined_and)
131 def test_partial_text_matching():
132 """check if implicit and explicit partial text matching works"""
133 # clean up everything first
134 clean_database()
135 # initialize the database with example content
136 new_db = _get_indexer(DATABASE)
137 create_example_content(new_db)
138 # this query should return three matches (disabled partial matching)
139 q_plain_partial1 = new_db.make_query("bar",
140 analyzer=(new_db.analyzer ^ new_db.ANALYZER_PARTIAL))
141 r_plain_partial1 = new_db.get_query_result(q_plain_partial1).get_matches(0,10)
142 assert r_plain_partial1[0] == 2
143 # this query should return three matches (wildcard works)
144 q_plain_partial2 = new_db.make_query("bar", analyzer=new_db.ANALYZER_PARTIAL)
145 r_plain_partial2 = new_db.get_query_result(q_plain_partial2).get_matches(0,10)
146 assert r_plain_partial2[0] == 3
147 # return two matches (the wildcard is ignored without PARTIAL)
148 q_plain_partial3 = new_db.make_query("bar*",
149 analyzer=(new_db.analyzer ^ new_db.ANALYZER_PARTIAL))
150 r_plain_partial3 = new_db.get_query_result(q_plain_partial3).get_matches(0,10)
151 assert r_plain_partial3[0] == 2
152 # partial matching at the start of the string
153 # TODO: enable this as soon, as partial matching works at the beginning of text
154 #q_plain_partial4 = new_db.make_query("*ar",
155 # analyzer=new_db.ANALYZER_EXACT)
156 # analyzer=(new_db.analyzer ^ new_db.ANALYZER_PARTIAL))
157 #r_plain_partial4 = new_db.get_query_result(q_plain_partial4).get_matches(0,10)
158 #assert r_plain_partial4[0] == 2
159 # clean up
160 clean_database()
163 def test_field_matching():
164 """test if field specific searching works"""
165 # clean up everything first
166 clean_database()
167 # initialize the database with example content
168 new_db = _get_indexer(DATABASE)
169 create_example_content(new_db)
170 # do a field search with a tuple argument
171 q_field1 = new_db.make_query(("fname1", "foo_field1"))
172 r_field1 = new_db.get_query_result(q_field1).get_matches(0,10)
173 assert r_field1[0] == 1
174 # do a field search with a dict argument
175 q_field2 = new_db.make_query({"fname1":"bar_field1"})
176 r_field2 = new_db.get_query_result(q_field2).get_matches(0,10)
177 assert r_field2[0] == 1
178 # do an incomplete field search with a dict argument - should fail
179 q_field3 = new_db.make_query({"fname2":"foo_field"})
180 r_field3 = new_db.get_query_result(q_field3).get_matches(0,10)
181 assert r_field3[0] == 0
182 # do an AND field search with a dict argument
183 q_field4 = new_db.make_query({"fname1":"foo_field1", "fname2":"foo_field2"}, require_all=True)
184 r_field4 = new_db.get_query_result(q_field4).get_matches(0,10)
185 assert r_field4[0] == 1
186 # do an OR field search with a dict argument
187 q_field5 = new_db.make_query({"fname1":"foo_field1", "fname2":"foo_field2"}, require_all=False)
188 r_field5 = new_db.get_query_result(q_field5).get_matches(0,10)
189 assert r_field5[0] == 2
190 # do an incomplete field search with a partial field analyzer
191 q_field6 = new_db.make_query({"fname1":"foo_field"}, analyzer=new_db.ANALYZER_PARTIAL)
192 r_field6 = new_db.get_query_result(q_field6).get_matches(0,10)
193 assert r_field6[0] == 1
194 # clean up
195 clean_database()
197 def test_field_analyzers():
198 """test if we can change the analyzer of specific fields"""
199 # clean up everything first
200 clean_database()
201 # initialize the database with example content
202 new_db = _get_indexer(DATABASE)
203 create_example_content(new_db)
204 # do an incomplete field search with partial analyzer (configured for this field)
205 q_field1 = new_db.make_query({"fname1":"bar_field"})
206 r_field1 = new_db.get_query_result(q_field1).get_matches(0,10)
207 assert r_field1[0] == 1
208 # check the get/set field analyzer functions
209 old_analyzer = new_db.get_field_analyzers("fname1")
210 new_db.set_field_analyzers({"fname1":new_db.ANALYZER_EXACT})
211 assert new_db.get_field_analyzers("fname1") == new_db.ANALYZER_EXACT
212 new_db.set_field_analyzers({"fname1":new_db.ANALYZER_PARTIAL})
213 assert new_db.get_field_analyzers("fname1") == new_db.ANALYZER_PARTIAL
214 # restore previous setting
215 new_db.set_field_analyzers({"fname1":old_analyzer})
216 # check if ANALYZER_TOKENIZE is the default
217 assert (new_db.get_field_analyzers("thisFieldDoesNotExist") & new_db.ANALYZER_TOKENIZE) > 0
218 # do an incomplete field search - now we use the partial analyzer
219 q_field2 = new_db.make_query({"fname1":"bar_field"}, analyzer=new_db.ANALYZER_PARTIAL)
220 r_field2 = new_db.get_query_result(q_field2).get_matches(0,10)
221 assert r_field2[0] == 1
222 # clean up
223 clean_database()
225 def test_and_queries():
226 """test if AND queries work as expected"""
227 # clean up everything first
228 clean_database()
229 # initialize the database with example content
230 new_db = _get_indexer(DATABASE)
231 create_example_content(new_db)
232 # do an AND query (partial matching disabled)
233 q_and1 = new_db.make_query("foo bar",
234 analyzer=(new_db.analyzer ^ new_db.ANALYZER_PARTIAL))
235 r_and1 = new_db.get_query_result(q_and1).get_matches(0,10)
236 assert r_and1[0] == 2
237 # do the same AND query in a different way
238 q_and2 = new_db.make_query(["foo", "bar"],
239 analyzer=(new_db.analyzer ^ new_db.ANALYZER_PARTIAL))
240 r_and2 = new_db.get_query_result(q_and2).get_matches(0,10)
241 assert r_and2[0] == 2
242 # do an AND query without results
243 q_and3 = new_db.make_query(["HELO", "bar", "med"],
244 analyzer=(new_db.analyzer ^ new_db.ANALYZER_PARTIAL))
245 r_and3 = new_db.get_query_result(q_and3).get_matches(0,10)
246 assert r_and3[0] == 0
247 # clean up
248 clean_database()
250 def test_or_queries():
251 """test if OR queries work as expected"""
252 # clean up everything first
253 clean_database()
254 # initialize the database with example content
255 new_db = _get_indexer(DATABASE)
256 create_example_content(new_db)
257 # do an OR query
258 q_or1 = new_db.make_query("foo bar", require_all=False)
259 r_or1 = new_db.get_query_result(q_or1).get_matches(0,10)
260 assert r_or1[0] == 4
261 # do the same or query in a different way
262 q_or2 = new_db.make_query(["foo", "bar"], require_all=False)
263 r_or2 = new_db.get_query_result(q_or2).get_matches(0,10)
264 assert r_or2[0] == r_or1[0]
265 # do an OR query with lots of results
266 q_or3 = new_db.make_query(["HELO", "bar", "med"], require_all=False)
267 r_or3 = new_db.get_query_result(q_or3).get_matches(0,10)
268 assert r_or3[0] == 5
269 # clean up
270 clean_database()
272 def test_lower_upper_case():
273 """test if case is ignored for queries and for indexed terms"""
274 # clean up everything first
275 clean_database()
276 # initialize the database with example content
277 new_db = _get_indexer(DATABASE)
278 create_example_content(new_db)
279 # use upper case search terms for lower case indexed terms
280 q_case1 = new_db.make_query("BAR",
281 analyzer=(new_db.analyzer ^ new_db.ANALYZER_PARTIAL))
282 r_case1 = new_db.get_query_result(q_case1).get_matches(0,10)
283 assert r_case1[0] == 2
284 # use lower case search terms for upper case indexed terms
285 q_case2 = new_db.make_query("helo")
286 r_case2 = new_db.get_query_result(q_case2).get_matches(0,10)
287 assert r_case2[0] == 3
288 # use lower case search terms for lower case indexed terms
289 q_case3 = new_db.make_query("bar",
290 analyzer=(new_db.analyzer ^ new_db.ANALYZER_PARTIAL))
291 r_case3 = new_db.get_query_result(q_case3).get_matches(0,10)
292 assert r_case3[0] == 2
293 # use upper case search terms for upper case indexed terms
294 q_case4 = new_db.make_query("HELO")
295 r_case4 = new_db.get_query_result(q_case4).get_matches(0,10)
296 assert r_case4[0] == 3
297 # clean up
298 clean_database()
300 def test_tokenizing():
301 """test if the TOKENIZE analyzer field setting is honoured"""
302 # clean up everything first
303 clean_database()
304 # initialize the database with example content
305 new_db = _get_indexer(DATABASE)
306 create_example_content(new_db)
307 # check if the plain term was tokenized
308 q_token1 = new_db.make_query("rfv")
309 r_token1 = new_db.get_query_result(q_token1).get_matches(0,10)
310 assert r_token1[0] == 2
311 # check if the field term was tokenized
312 q_token2 = new_db.make_query({"fname1":"wsx"})
313 r_token2 = new_db.get_query_result(q_token2).get_matches(0,10)
314 assert r_token2[0] == 1
315 # check that the other field term was not tokenized
316 q_token3 = new_db.make_query({"fname2":"wsx"})
317 r_token3 = new_db.get_query_result(q_token3).get_matches(0,10)
318 assert r_token3[0] == 0
319 # check that the other field term was not tokenized
320 q_token4 = new_db.make_query({"fname2":"foo-bar.po"})
321 #q_token4 = new_db.make_query("poo-foo.po")
322 r_token4 = new_db.get_query_result(q_token4).get_matches(0,10)
323 # problem can be fixed by adding "TOKENIZE" to the field before populating the database -> this essentially splits the document term into pieces
324 assert r_token4[0] == 1
325 # clean up
326 clean_database()
328 def test_searching():
329 """test if searching (retrieving specified field values) works"""
330 # clean up everything first
331 clean_database()
332 # initialize the database with example content
333 new_db = _get_indexer(DATABASE)
334 create_example_content(new_db)
335 q_search1 = new_db.make_query({"fname1": "bar_field1"})
336 r_search1 = new_db.search(q_search1, ["fname2", None])
337 assert len(r_search1) == 1
338 dict_search1 = r_search1[0]
339 assert dict_search1.has_key("fname2") and \
340 (dict_search1["fname2"] == ["foo_field2"])
341 # a stupid way for checking, if the second field list is also correct
342 # (without caring for the order of the list)
343 assert dict_search1.has_key(None)
344 # TODO: for now PyLucene cares for case, while xapian doesn't - FIXME
345 list_search1_sorted = [item.lower() for item in dict_search1[None]]
346 list_search1_sorted.sort()
347 assert list_search1_sorted == ["foo", "helo"]
348 # clean up
349 clean_database()
351 def show_database(database):
352 """print the complete database - for debugging purposes"""
353 if hasattr(database, "database"):
354 _show_database_xapian(database)
355 else:
356 _show_database_pylucene(database)
359 def _show_database_pylucene(database):
360 database.flush()
361 reader = database.reader
362 for index in range(reader.maxDoc()):
363 print reader.document(index).toString().encode("charmap")
365 def _show_database_xapian(database):
366 import xapian
367 doccount = database.database.get_doccount()
368 max_doc_index = database.database.get_lastdocid()
369 print "Database overview: %d items up to index %d" % (doccount, max_doc_index)
370 for index in range(1, max_doc_index+1):
371 try:
372 document = database.database.get_document(index)
373 except xapian.DocNotFoundError:
374 continue
375 # print the document's terms and their positions
376 print "\tDocument [%d]: %s" % (index,
377 str([(one_term.term, [posi for posi in one_term.positer])
378 for one_term in document.termlist()]))
381 def _get_number_of_docs(database):
382 if hasattr(database, "database"):
383 # xapian
384 return database.database.get_lastdocid()
385 else:
386 # pylucene
387 database.flush()
388 return database.reader.numDocs()
390 def get_engine_name(database):
391 return database.__module__
393 def report_whitelisted_success(db, name):
394 """ Output a warning message regarding a successful unittest, that was
395 supposed to fail for a specific indexing engine.
396 As this test works now for the engine, the whitelisting should be removed.
398 print "the test '%s' works again for '%s' - please remove the exception" \
399 % (name, get_engine_name(db))
401 def report_whitelisted_failure(db, name):
402 """ Output a warning message regarding a unittest, that was supposed to fail
403 for a specific indexing engine.
404 Since the test behaves as expected (it fails), this is just for reminding
405 developers on these open issues of the indexing engine support.
407 print "the test '%s' fails - as expected for '%s'" % (name,
408 get_engine_name(db))
410 def assert_whitelisted(db, assert_value, white_list_engines, name_of_check):
411 """ Do an assertion, but ignoring failure for specific indexing engines.
412 This can be used for almost-complete implementations, that just need
413 a little bit of improvement for full compliance.
415 try:
416 assert assert_value
417 if get_engine_name(db) in white_list_engines:
418 report_whitelisted_success(db, name_of_check)
419 except AssertionError:
420 if get_engine_name(db) in white_list_engines:
421 report_whitelisted_failure(db, name_of_check)
422 else:
423 raise
426 if __name__ == "__main__":
427 # if an argument is given: use it as a database directory and show it
428 if len(sys.argv) > 1:
429 db = _get_indexer(sys.argv[1])
430 show_database(db)
431 sys.exit(0)
432 for engine in ORDER_OF_TESTS:
433 default_engine = engine
434 # cleanup the database after interrupted tests
435 clean_database()
436 engine_name = get_engine_name(_get_indexer(DATABASE))
437 if engine_name == default_engine:
438 print "************ running tests for '%s' *****************" \
439 % engine_name
440 else:
441 print "************ SKIPPING tests for '%s' *****************" \
442 % default_engine
443 continue
444 test_create_database()
445 test_open_database()
446 test_make_queries()
447 test_partial_text_matching()
448 test_field_matching()
449 test_field_analyzers()
450 test_and_queries()
451 test_or_queries()
452 test_lower_upper_case()
453 test_tokenizing()
454 test_searching()
455 # TODO: add test for document deletion
456 # TODO: add test for transaction handling
457 # TODO: add test for multiple engine/database handling in "get_indexer"
458 clean_database()