2 # -*- coding: utf-8 -*-
4 #Copyright 2008-2011 Steffen Schaumburg
5 #This program is free software: you can redistribute it and/or modify
6 #it under the terms of the GNU Affero General Public License as published by
7 #the Free Software Foundation, version 3 of the License.
9 #This program is distributed in the hope that it will be useful,
10 #but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 #GNU General Public License for more details.
14 #You should have received a copy of the GNU Affero General Public License
15 #along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #In the "official" distribution you can find the license in agpl-3.0.txt.
19 _
= L10n
.get_translation()
21 # Standard Library modules
23 import os
# todo: remove this once import_dir is in fpdb_import
25 from time
import time
, strftime
, sleep
, clock
31 from collections
import deque
# using Queue for now
35 # logging has been set up in fpdb.py or HUD_main.py, use their settings:
36 log
= logging
.getLogger("importer")
41 # fpdb/FreePokerTools modules
47 # database interface modules
51 log
.debug(_("Import database module: MySQLdb not found"))
58 log
.debug(_("Import database module: psycopg2 not found"))
60 import psycopg2
.extensions
61 psycopg2
.extensions
.register_type(psycopg2
.extensions
.UNICODE
)
64 def __init__(self
, caller
, settings
, config
, sql
= None, parent
= None):
66 self
.settings
= settings
72 #log = Configuration.get_logger("logging.conf", "importer", log_dir=self.config.dir_log)
76 self
.addToDirList
= {}
77 self
.removeFromFileList
= {} # to remove deleted files
82 self
.faobs
= None # File as one big string
83 self
.pos_in_file
= {} # dict to remember how far we have read in the file
85 self
.callHud
= self
.config
.get_import_parameters().get("callFpdbHud")
87 # CONFIGURATION OPTIONS
88 self
.settings
.setdefault("handCount", 0)
89 #self.settings.setdefault("allowHudcacheRebuild", True) # NOT USED NOW
90 #self.settings.setdefault("forceThreads", 2) # NOT USED NOW
91 self
.settings
.setdefault("writeQSize", 1000) # no need to change
92 self
.settings
.setdefault("writeQMaxWait", 10) # not used
93 self
.settings
.setdefault("dropIndexes", "don't drop")
94 self
.settings
.setdefault("dropHudCache", "don't drop")
95 self
.settings
.setdefault("starsArchive", False)
96 self
.settings
.setdefault("ftpArchive", False)
97 self
.settings
.setdefault("testData", False)
98 self
.settings
.setdefault("cacheHHC", False)
101 self
.database
= Database
.Database(self
.config
, sql
= self
.sql
)
103 self
.settings
.setdefault("threads", 1) # value set by GuiBulkImport
104 for i
in xrange(self
.settings
['threads']):
105 self
.writerdbs
.append( Database
.Database(self
.config
, sql
= self
.sql
) )
107 clock() # init clock in windows
110 def setCallHud(self
, value
):
113 def setCacheSessions(self
, value
):
114 self
.cacheSessions
= value
116 def setHandCount(self
, value
):
117 self
.settings
['handCount'] = int(value
)
119 def setQuiet(self
, value
):
120 self
.settings
['quiet'] = value
122 def setFailOnError(self
, value
):
123 self
.settings
['failOnError'] = value
125 def setHandsInDB(self
, value
):
126 self
.settings
['handsInDB'] = value
128 def setThreads(self
, value
):
129 self
.settings
['threads'] = value
130 if self
.settings
["threads"] > len(self
.writerdbs
):
131 for i
in xrange(self
.settings
['threads'] - len(self
.writerdbs
)):
132 self
.writerdbs
.append( Database
.Database(self
.config
, sql
= self
.sql
) )
134 def setDropIndexes(self
, value
):
135 self
.settings
['dropIndexes'] = value
137 def setDropHudCache(self
, value
):
138 self
.settings
['dropHudCache'] = value
140 def setStarsArchive(self
, value
):
141 self
.settings
['starsArchive'] = value
143 def setFTPArchive(self
, value
):
144 self
.settings
['ftpArchive'] = value
146 def setPrintTestData(self
, value
):
147 self
.settings
['testData'] = value
149 def setFakeCacheHHC(self
, value
):
150 self
.settings
['cacheHHC'] = value
152 def getCachedHHC(self
):
153 return self
.handhistoryconverter
155 # def setWatchTime(self):
156 # self.updated = time()
158 def clearFileList(self
):
159 self
.updatedsize
= {}
161 self
.pos_in_file
= {}
165 self
.database
.disconnect()
166 for i
in xrange(len(self
.writerdbs
)):
167 self
.writerdbs
[i
].disconnect()
169 def logImport(self
, type, file, stored
, dups
, partial
, errs
, ttime
, id):
170 hands
= stored
+ dups
+ partial
+ errs
171 now
= datetime
.datetime
.utcnow()
172 ttime100
= ttime
* 100
173 self
.database
.updateFile([type, now
, now
, hands
, stored
, dups
, partial
, errs
, ttime100
, True, id])
175 def addFileToList(self
, file, site
, filter):
176 now
= datetime
.datetime
.utcnow()
177 file = os
.path
.splitext(os
.path
.basename(file))[0]
178 try: #TODO: this is a dirty hack. GBI needs it, GAI fails with it.
179 file = unicode(file, "utf8", "replace")
182 id = self
.database
.storeFile([file, site
, now
, now
, 0, 0, 0, 0, 0, 0, False])
183 self
.database
.commit()
184 return [site
] + [filter] + [id]
186 #Add an individual file to filelist
187 def addImportFile(self
, filename
, site
= "default", filter = "passthrough"):
188 #TODO: test it is a valid file -> put that in config!!
189 #print "addimportfile: filename is a", filename.__class__
190 # filename not guaranteed to be unicode
191 if filename
in self
.filelist
or not os
.path
.exists(filename
):
193 self
.filelist
[filename
] = self
.addFileToList(filename
, site
, filter)
194 if site
not in self
.siteIds
:
195 # Get id from Sites table in DB
196 result
= self
.database
.get_site_id(site
)
198 self
.siteIds
[site
] = result
[0][0]
201 log
.error(_("Database ID for %s not found") % site
)
203 log
.error(_("[ERROR] More than 1 Database ID found for %s - Multiple currencies not implemented yet") % site
)
206 # Called from GuiBulkImport to add a file or directory.
207 def addBulkImportImportFileOrDir(self
, inputPath
, site
= "PokerStars"):
208 """Add a file or directory for bulk import"""
209 filter = self
.config
.hhcs
[site
].converter
210 # Bulk import never monitors
211 # if directory, add all files in it. Otherwise add single file.
212 # TODO: only add sane files?
213 if os
.path
.isdir(inputPath
):
214 for subdir
in os
.walk(inputPath
):
215 for file in subdir
[2]:
216 self
.addImportFile(os
.path
.join(subdir
[0], file), site
=site
, filter=filter)
218 self
.addImportFile(inputPath
, site
=site
, filter=filter)
220 #Add a directory of files to filelist
221 #Only one import directory per site supported.
222 #dirlist is a hash of lists:
223 #dirlist{ 'PokerStars' => ["/path/to/import/", "filtername"] }
224 def addImportDirectory(self
,dir,monitor
=False, site
="default", filter="passthrough"):
225 #gets called by GuiAutoImport.
226 #This should really be using os.walk
227 #http://docs.python.org/library/os.html
228 if os
.path
.isdir(dir):
231 self
.dirlist
[site
] = [dir] + [filter]
233 #print "addImportDirectory: checking files in", dir
234 for file in os
.listdir(dir):
235 #print " adding file ", file
236 self
.addImportFile(os
.path
.join(dir, file), site
, filter)
238 log
.warning(_("Attempted to add non-directory '%s' as an import directory") % str(dir))
241 """"Run full import on self.filelist. This is called from GuiBulkImport.py"""
242 #if self.settings['forceThreads'] > 0: # use forceThreads until threading enabled in GuiBulkImport
243 # self.setThreads(self.settings['forceThreads'])
246 start
= datetime
.datetime
.now()
248 log
.info(_("Started at %s -- %d files to import. indexes: %s") % (start
, len(self
.filelist
), self
.settings
['dropIndexes']))
249 if self
.settings
['dropIndexes'] == 'auto':
250 self
.settings
['dropIndexes'] = self
.calculate_auto2(self
.database
, 12.0, 500.0)
251 if 'dropHudCache' in self
.settings
and self
.settings
['dropHudCache'] == 'auto':
252 self
.settings
['dropHudCache'] = self
.calculate_auto2(self
.database
, 25.0, 500.0) # returns "drop"/"don't drop"
254 if self
.settings
['dropIndexes'] == 'drop':
255 self
.database
.prepareBulkImport()
257 log
.info(_("No need to drop indexes."))
258 #print "dropInd =", self.settings['dropIndexes'], " dropHudCache =", self.settings['dropHudCache']
260 if self
.settings
['threads'] <= 0:
261 (totstored
, totdups
, totpartial
, toterrors
) = self
.importFiles(None)
263 # create queue (will probably change to deque at some point):
264 self
.writeq
= Queue
.Queue( self
.settings
['writeQSize'] )
265 # start separate thread(s) to read hands from queue and write to db:
266 for i
in xrange(self
.settings
['threads']):
267 t
= threading
.Thread( target
=self
.writerdbs
[i
].insert_queue_hands
268 , args
=(self
.writeq
, self
.settings
["writeQMaxWait"])
269 , name
="dbwriter-"+str(i
) )
272 # read hands and write to q:
273 (totstored
, totdups
, totpartial
, toterrors
) = self
.importFiles(self
.writeq
)
275 if self
.writeq
.empty():
276 print _("writers finished already")
279 print _("waiting for writers to finish ...")
280 #for t in threading.enumerate():
283 #using empty() might be more reliable:
284 while not self
.writeq
.empty() and len(threading
.enumerate()) > 1:
285 # TODO: Do we need to actually tell the progress indicator to move, or is it already moving, and we just need to process events...
286 while gtk
.events_pending(): # see http://faq.pygtk.org/index.py?req=index for more hints (3.7)
287 gtk
.main_iteration(False)
289 print _(" ... writers finished")
291 # Tidying up after import
292 if self
.settings
['dropIndexes'] == 'drop':
293 self
.database
.afterBulkImport()
295 log
.info (_("No need to rebuild indexes."))
296 if 'dropHudCache' in self
.settings
and self
.settings
['dropHudCache'] == 'drop':
297 self
.database
.rebuild_hudcache()
299 log
.info (_("No need to rebuild hudcache."))
300 self
.database
.analyzeDB()
302 return (totstored
, totdups
, totpartial
, toterrors
, endtime
-starttime
)
305 def importFiles(self
, q
):
306 """"Read filenames in self.filelist and pass to import_file_dict().
307 Uses a separate database connection if created as a thread (caller
308 passes None or no param as db)."""
316 #prepare progress popup window
317 ProgressDialog
= ProgressBar(len(self
.filelist
), self
.parent
)
319 for file in self
.filelist
:
321 ProgressDialog
.progress_update(file, str(self
.database
.getHandCount()))
323 (stored
, duplicates
, partial
, errors
, ttime
) = self
.import_file_dict(file, self
.filelist
[file][0]
324 ,self
.filelist
[file][1], self
.filelist
[file][2], q
)
326 totdups
+= duplicates
327 totpartial
+= partial
330 self
.logImport('bulk', file, stored
, duplicates
, partial
, errors
, ttime
, self
.filelist
[file][2])
331 self
.database
.commit()
334 for i
in xrange( self
.settings
['threads'] ):
335 print _("sending finish message queue length ="), q
.qsize()
336 db
.send_finish_msg(q
)
339 return (totstored
, totdups
, totpartial
, toterrors
)
340 # end def importFiles
343 def calculate_auto(self
, db
):
344 """An heuristic to determine a reasonable value of drop/don't drop"""
345 if len(self
.filelist
) == 1: return "don't drop"
346 if 'handsInDB' not in self
.settings
:
348 tmpcursor
= db
.get_cursor()
349 tmpcursor
.execute("Select count(1) from Hands;")
350 self
.settings
['handsInDB'] = tmpcursor
.fetchone()[0]
352 pass # if this fails we're probably doomed anyway
353 if self
.settings
['handsInDB'] < 5000: return "drop"
354 if len(self
.filelist
) < 50: return "don't drop"
355 if self
.settings
['handsInDB'] > 50000: return "don't drop"
358 def calculate_auto2(self
, db
, scale
, increment
):
359 """A second heuristic to determine a reasonable value of drop/don't drop
360 This one adds up size of files to import to guess number of hands in them
361 Example values of scale and increment params might be 10 and 500 meaning
362 roughly: drop if importing more than 10% (100/scale) of hands in db or if
363 less than 500 hands in db"""
364 size_per_hand
= 1300.0 # wag based on a PS 6-up FLHE file. Actual value not hugely important
365 # as values of scale and increment compensate for it anyway.
366 # decimal used to force float arithmetic
368 # get number of hands in db
369 if 'handsInDB' not in self
.settings
:
371 tmpcursor
= db
.get_cursor()
372 tmpcursor
.execute("Select count(1) from Hands;")
373 self
.settings
['handsInDB'] = tmpcursor
.fetchone()[0]
375 pass # if this fails we're probably doomed anyway
377 # add up size of import files
379 for file in self
.filelist
:
380 if os
.path
.exists(file):
381 stat_info
= os
.stat(file)
382 total_size
+= stat_info
.st_size
384 # if hands_in_db is zero or very low, we want to drop indexes, otherwise compare
385 # import size with db size somehow:
387 if self
.settings
['handsInDB'] < scale
* (total_size
/size_per_hand
) + increment
:
389 #print "auto2: handsindb =", self.settings['handsInDB'], "total_size =", total_size, "size_per_hand =", \
390 # size_per_hand, "inc =", increment, "return:", ret
393 #Run import on updated files, then store latest update time. Called from GuiAutoImport.py
394 def runUpdated(self
):
395 #Check for new files in monitored directories
396 #todo: make efficient - always checks for new file, should be able to use mtime of directory
397 # ^^ May not work on windows
399 #rulog = open('runUpdated.txt', 'a')
400 #rulog.writelines("runUpdated ... ")
401 for site
in self
.dirlist
:
402 self
.addImportDirectory(self
.dirlist
[site
][0], False, site
, self
.dirlist
[site
][1])
404 for file in self
.filelist
:
405 if os
.path
.exists(file):
406 stat_info
= os
.stat(file)
407 #rulog.writelines("path exists ")
408 if file in self
.updatedsize
: # we should be able to assume that if we're in size, we're in time as well
409 if stat_info
.st_size
> self
.updatedsize
[file] or stat_info
.st_mtime
> self
.updatedtime
[file]:
410 # print "file",file," updated", os.path.basename(file), stat_info.st_size, self.updatedsize[file], stat_info.st_mtime, self.updatedtime[file]
412 if not os
.path
.isdir(file):
413 self
.caller
.addText("\n"+os
.path
.basename(file))
414 except KeyError: # TODO: What error happens here?
416 (stored
, duplicates
, partial
, errors
, ttime
) = self
.import_file_dict(file, self
.filelist
[file][0]
417 ,self
.filelist
[file][1], self
.filelist
[file][2], None)
418 self
.logImport('auto', file, stored
, duplicates
, partial
, errors
, ttime
, self
.filelist
[file][2])
419 self
.database
.commit()
421 if not os
.path
.isdir(file): # Note: This assumes that whatever calls us has an "addText" func
422 self
.caller
.addText(" %d stored, %d duplicates, %d partial, %d errors (time = %f)" % (stored
, duplicates
, partial
, errors
, ttime
))
423 except KeyError: # TODO: Again, what error happens here? fix when we find out ..
425 self
.updatedsize
[file] = stat_info
.st_size
426 self
.updatedtime
[file] = time()
428 if os
.path
.isdir(file) or (time() - stat_info
.st_mtime
) < 60:
429 self
.updatedsize
[file] = 0
430 self
.updatedtime
[file] = 0
432 self
.updatedsize
[file] = stat_info
.st_size
433 self
.updatedtime
[file] = time()
435 self
.removeFromFileList
[file] = True
437 self
.addToDirList
= filter(lambda x
: self
.addImportDirectory(x
, True, self
.addToDirList
[x
][0], self
.addToDirList
[x
][1]), self
.addToDirList
)
439 for file in self
.removeFromFileList
:
440 if file in self
.filelist
:
441 del self
.filelist
[file]
443 self
.addToDirList
= {}
444 self
.removeFromFileList
= {}
445 self
.database
.rollback()
446 #rulog.writelines(" finished\n")
449 # This is now an internal function that should not be called directly.
450 def import_file_dict(self
, file, site
, filter, fileId
, q
=None):
452 if os
.path
.isdir(file):
453 self
.addToDirList
[file] = [site
] + [filter]
456 (stored
, duplicates
, partial
, errors
, ttime
) = (0, 0, 0, 0, time())
458 # Load filter, process file, pass returned filename to import_fpdb_file
459 if self
.settings
['threads'] > 0 and self
.writeq
is not None:
460 log
.info((_("Converting %s") % file) + " (" + str(q
.qsize()) + ")")
461 else: log
.info(_("Converting %s") % file)
463 filter_name
= filter.replace("ToFpdb", "")
464 mod
= __import__(filter)
465 obj
= getattr(mod
, filter_name
, None)
468 if file in self
.pos_in_file
: idx
= self
.pos_in_file
[file]
469 else: self
.pos_in_file
[file], idx
= 0, 0
471 hhc
= obj( self
.config
, in_path
= file, index
= idx
472 ,starsArchive
= self
.settings
['starsArchive']
473 ,ftpArchive
= self
.settings
['ftpArchive']
477 if self
.caller
: hhc
.progressNotify()
478 handlist
= hhc
.getProcessedHands()
479 self
.pos_in_file
[file] = hhc
.getLastCharacterRead()
480 (hbulk
, hpbulk
, habulk
, hcbulk
, phands
, ihands
, to_hud
) = ([], [], [], [], [], [], [])
481 sc
, gsc
= {'bk': []}, {'bk': []}
483 ####Lock Placeholder####
484 for hand
in handlist
:
485 hand
.prepInsert(self
.database
, printtest
= self
.settings
['testData'])
486 self
.database
.commit()
488 ####Lock Placeholder####
493 ####Lock Placeholder####
494 id = self
.database
.nextHandId()
495 for i
in range(len(phands
)):
496 doinsert
= len(phands
)==i
+1
499 id = hand
.getHandId(self
.database
, id)
500 sc
, gsc
= hand
.updateSessionsCache(self
.database
, sc
, gsc
, None, doinsert
)
501 hbulk
= hand
.insertHands(self
.database
, hbulk
, fileId
, doinsert
, self
.settings
['testData'])
502 hcbulk
= hand
.updateHudCache(self
.database
, hcbulk
, doinsert
)
504 to_hud
.append(hand
.dbid_hands
)
505 except Exceptions
.FpdbHandDuplicate
:
507 self
.database
.commit()
508 ####Lock Placeholder####
510 for i
in range(len(ihands
)):
511 doinsert
= len(ihands
)==i
+1
513 hpbulk
= hand
.insertHandsPlayers(self
.database
, hpbulk
, doinsert
, self
.settings
['testData'])
514 habulk
= hand
.insertHandsActions(self
.database
, habulk
, doinsert
, self
.settings
['testData'])
515 self
.database
.commit()
517 #pipe the Hands.id out to the HUD
521 print _("fpdb_import: sending hand to hud"), hid
, "pipe =", self
.caller
.pipe_to_hud
522 self
.caller
.pipe_to_hud
.stdin
.write("%s" % (hid
) + os
.linesep
)
524 log
.error(_("Failed to send hand to HUD: %s") % e
)
526 errors
= getattr(hhc
, 'numErrors')
527 stored
= getattr(hhc
, 'numHands')
530 # Really ugly hack to allow testing Hands within the HHC from someone
531 # with only an Importer objec
532 if self
.settings
['cacheHHC']:
533 self
.handhistoryconverter
= hhc
535 # conversion didn't work
536 # TODO: appropriate response?
537 return (0, 0, 0, 1, time() - ttime
)
539 log
.warning(_("Unknown filter filter_name:'%s' in filter:'%s'") %(filter_name
, filter))
540 return (0, 0, 0, 1, time() - ttime
)
542 ttime
= time() - ttime
544 #This will barf if conv.getStatus != True
545 return (stored
, duplicates
, partial
, errors
, ttime
)
548 def printEmailErrorMessage(self
, errors
, filename
, line
):
549 traceback
.print_exc(file=sys
.stderr
)
550 print (_("Error No.%s please send the hand causing this to fpdb-main@lists.sourceforge.net so we can fix the problem.") % errors
)
551 print _("Filename:"), filename
552 print _("Here is the first line of the hand so you can identify it. Please mention that the error was a ValueError:")
554 print _("Hand logged to hand-errors.txt")
555 logfile
= open('hand-errors.txt', 'a')
557 logfile
.write(str(s
) + "\n")
565 Popup window to show progress
567 Init method sets up total number of expected iterations
568 If no parent is passed to init, command line
569 mode assumed, and does not create a progress bar
575 self
.progress
.destroy()
578 def progress_update(self
, file, handcount
):
585 #update sum if fraction exceeds expected total number of iterations
586 if self
.fraction
> self
.sum:
589 #progress bar total set to 1 plus the number of items,to prevent it
590 #reaching 100% prior to processing fully completing
592 progress_percent
= float(self
.fraction
) / (float(self
.sum) + 1.0)
593 progress_text
= (self
.title
+ " "
594 + str(self
.fraction
) + " / " + str(self
.sum))
596 self
.pbar
.set_fraction(progress_percent
)
597 self
.pbar
.set_text(progress_text
)
599 self
.handcount
.set_text(_("Database Statistics") + " - " + _("Number of Hands: ") + handcount
)
601 now
= datetime
.datetime
.now()
602 now_formatted
= now
.strftime("%H:%M:%S")
603 self
.progresstext
.set_text(now_formatted
+ " - "+self
.title
+ " " +file+"\n")
606 def __init__(self
, sum, parent
):
610 #no parent is passed, assume this is being run from the
611 #command line, so return immediately
616 self
.title
= _("Importing")
618 self
.progress
= gtk
.Window(gtk
.WINDOW_TOPLEVEL
)
619 self
.progress
.set_size_request(500,150)
621 self
.progress
.set_resizable(False)
622 self
.progress
.set_modal(True)
623 self
.progress
.set_transient_for(self
.parent
)
624 self
.progress
.set_decorated(True)
625 self
.progress
.set_deletable(False)
626 self
.progress
.set_title(self
.title
)
628 vbox
= gtk
.VBox(False, 5)
629 vbox
.set_border_width(10)
630 self
.progress
.add(vbox
)
633 align
= gtk
.Alignment(0, 0, 0, 0)
634 vbox
.pack_start(align
, False, True, 2)
637 self
.pbar
= gtk
.ProgressBar()
641 align
= gtk
.Alignment(0, 0, 0, 0)
642 vbox
.pack_start(align
, False, True, 2)
645 self
.handcount
= gtk
.Label()
646 align
.add(self
.handcount
)
647 self
.handcount
.show()
649 align
= gtk
.Alignment(0, 0, 0, 0)
650 vbox
.pack_start(align
, False, True, 0)
653 self
.progresstext
= gtk
.Label()
654 self
.progresstext
.set_line_wrap(True)
655 self
.progresstext
.set_selectable(True)
656 align
.add(self
.progresstext
)
657 self
.progresstext
.show()
662 if __name__
== "__main__":
663 print _("CLI for importing hands is GuiBulkImport.py")