2 # -*- coding: utf-8 -*-
4 #Copyright 2008-2011 Steffen Schaumburg
5 #This program is free software: you can redistribute it and/or modify
6 #it under the terms of the GNU Affero General Public License as published by
7 #the Free Software Foundation, version 3 of the License.
9 #This program is distributed in the hope that it will be useful,
10 #but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 #GNU General Public License for more details.
14 #You should have received a copy of the GNU Affero General Public License
15 #along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #In the "official" distribution you can find the license in agpl-3.0.txt.
19 _
= L10n
.get_translation()
21 # Standard Library modules
23 import os
# todo: remove this once import_dir is in fpdb_import
25 from time
import time
, strftime
, sleep
, clock
31 from collections
import deque
# using Queue for now
40 # fpdb/FreePokerTools modules
45 if __name__
== "__main__":
46 Configuration
.set_logfile("fpdb-log.txt")
47 # logging has been set up in fpdb.py or HUD_main.py, use their settings:
48 log
= logging
.getLogger("importer")
51 def __init__(self
, caller
, settings
, config
, sql
= None, parent
= None):
53 self
.settings
= settings
62 self
.addToDirList
= {}
63 self
.removeFromFileList
= {} # to remove deleted files
68 self
.faobs
= None # File as one big string
69 self
.pos_in_file
= {} # dict to remember how far we have read in the file
71 self
.callHud
= self
.config
.get_import_parameters().get("callFpdbHud")
73 # CONFIGURATION OPTIONS
74 self
.settings
.setdefault("handCount", 0)
75 #self.settings.setdefault("allowHudcacheRebuild", True) # NOT USED NOW
76 #self.settings.setdefault("forceThreads", 2) # NOT USED NOW
77 self
.settings
.setdefault("writeQSize", 1000) # no need to change
78 self
.settings
.setdefault("writeQMaxWait", 10) # not used
79 self
.settings
.setdefault("dropIndexes", "don't drop")
80 self
.settings
.setdefault("dropHudCache", "don't drop")
81 self
.settings
.setdefault("starsArchive", False)
82 self
.settings
.setdefault("ftpArchive", False)
83 self
.settings
.setdefault("testData", False)
84 self
.settings
.setdefault("cacheHHC", False)
87 self
.database
= Database
.Database(self
.config
, sql
= self
.sql
)
89 self
.settings
.setdefault("threads", 1) # value set by GuiBulkImport
90 for i
in xrange(self
.settings
['threads']):
91 self
.writerdbs
.append( Database
.Database(self
.config
, sql
= self
.sql
) )
93 clock() # init clock in windows
96 def setCallHud(self
, value
):
99 def setCacheSessions(self
, value
):
100 self
.cacheSessions
= value
102 def setHandCount(self
, value
):
103 self
.settings
['handCount'] = int(value
)
105 def setQuiet(self
, value
):
106 self
.settings
['quiet'] = value
108 def setFailOnError(self
, value
):
109 self
.settings
['failOnError'] = value
111 def setHandsInDB(self
, value
):
112 self
.settings
['handsInDB'] = value
114 def setThreads(self
, value
):
115 self
.settings
['threads'] = value
116 if self
.settings
["threads"] > len(self
.writerdbs
):
117 for i
in xrange(self
.settings
['threads'] - len(self
.writerdbs
)):
118 self
.writerdbs
.append( Database
.Database(self
.config
, sql
= self
.sql
) )
120 def setDropIndexes(self
, value
):
121 self
.settings
['dropIndexes'] = value
123 def setDropHudCache(self
, value
):
124 self
.settings
['dropHudCache'] = value
126 def setStarsArchive(self
, value
):
127 self
.settings
['starsArchive'] = value
129 def setFTPArchive(self
, value
):
130 self
.settings
['ftpArchive'] = value
132 def setPrintTestData(self
, value
):
133 self
.settings
['testData'] = value
135 def setFakeCacheHHC(self
, value
):
136 self
.settings
['cacheHHC'] = value
138 def getCachedHHC(self
):
139 return self
.handhistoryconverter
141 # def setWatchTime(self):
142 # self.updated = time()
144 def clearFileList(self
):
145 self
.updatedsize
= {}
147 self
.pos_in_file
= {}
150 def logImport(self
, type, file, stored
, dups
, partial
, errs
, ttime
, id):
151 hands
= stored
+ dups
+ partial
+ errs
152 now
= datetime
.datetime
.utcnow()
153 ttime100
= ttime
* 100
154 self
.database
.updateFile([type, now
, now
, hands
, stored
, dups
, partial
, errs
, ttime100
, True, id])
155 self
.database
.commit()
157 def addFileToList(self
, file, site
, filter):
158 now
= datetime
.datetime
.utcnow()
159 file = os
.path
.splitext(os
.path
.basename(file))[0]
160 try: #TODO: this is a dirty hack. GBI needs it, GAI fails with it.
161 file = unicode(file, "utf8", "replace")
164 id = self
.database
.storeFile([file, site
, now
, now
, 0, 0, 0, 0, 0, 0, False])
165 self
.database
.commit()
166 return [site
] + [filter] + [id]
168 #Add an individual file to filelist
169 def addImportFile(self
, filename
, site
= "default", filter = "passthrough"):
170 #TODO: test it is a valid file -> put that in config!!
171 #print "addimportfile: filename is a", filename.__class__
172 # filename not guaranteed to be unicode
173 if filename
in self
.filelist
or not os
.path
.exists(filename
):
175 self
.filelist
[filename
] = self
.addFileToList(filename
, site
, filter)
176 if site
not in self
.siteIds
:
177 # Get id from Sites table in DB
178 result
= self
.database
.get_site_id(site
)
180 self
.siteIds
[site
] = result
[0][0]
183 log
.error(_("Database ID for %s not found") % site
)
185 log
.error(_("More than 1 Database ID found for %s") % site
)
188 # Called from GuiBulkImport to add a file or directory. Bulk import never monitors
189 def addBulkImportImportFileOrDir(self
, inputPath
, site
= "PokerStars"):
191 #for windows platform, force os.walk variable to be unicode
192 # see fpdb-main post 9th July 2011
194 if self
.config
.posix
:
197 inputPath
= unicode(inputPath
)
199 """Add a file or directory for bulk import"""
200 filter = self
.config
.hhcs
[site
].converter
201 # TODO: only add sane files?
202 if os
.path
.isdir(inputPath
):
203 for subdir
in os
.walk(inputPath
):
204 for file in subdir
[2]:
205 self
.addImportFile(os
.path
.join(subdir
[0], file), site
=site
, filter=filter)
207 self
.addImportFile(inputPath
, site
=site
, filter=filter)
209 #Add a directory of files to filelist
210 #Only one import directory per site supported.
211 #dirlist is a hash of lists:
212 #dirlist{ 'PokerStars' => ["/path/to/import/", "filtername"] }
213 def addImportDirectory(self
,dir,monitor
=False, site
="default", filter="passthrough"):
214 #gets called by GuiAutoImport.
215 #This should really be using os.walk
216 #http://docs.python.org/library/os.html
217 if os
.path
.isdir(dir):
220 self
.dirlist
[site
] = [dir] + [filter]
222 #print "addImportDirectory: checking files in", dir
223 for file in os
.listdir(dir):
224 #print " adding file ", file
225 self
.addImportFile(os
.path
.join(dir, file), site
, filter)
227 log
.warning(_("Attempted to add non-directory '%s' as an import directory") % str(dir))
230 """"Run full import on self.filelist. This is called from GuiBulkImport.py"""
233 start
= datetime
.datetime
.now()
235 log
.info(_("Started at %s -- %d files to import. indexes: %s") % (start
, len(self
.filelist
), self
.settings
['dropIndexes']))
236 if self
.settings
['dropIndexes'] == 'auto':
237 self
.settings
['dropIndexes'] = self
.calculate_auto2(self
.database
, 12.0, 500.0)
238 if 'dropHudCache' in self
.settings
and self
.settings
['dropHudCache'] == 'auto':
239 self
.settings
['dropHudCache'] = self
.calculate_auto2(self
.database
, 25.0, 500.0) # returns "drop"/"don't drop"
241 if self
.settings
['threads'] <= 0:
242 (totstored
, totdups
, totpartial
, toterrors
) = self
.importFiles(None)
244 # create queue (will probably change to deque at some point):
245 self
.writeq
= Queue
.Queue( self
.settings
['writeQSize'] )
246 # start separate thread(s) to read hands from queue and write to db:
247 for i
in xrange(self
.settings
['threads']):
248 t
= threading
.Thread( target
=self
.writerdbs
[i
].insert_queue_hands
249 , args
=(self
.writeq
, self
.settings
["writeQMaxWait"])
250 , name
="dbwriter-"+str(i
) )
253 # read hands and write to q:
254 (totstored
, totdups
, totpartial
, toterrors
) = self
.importFiles(self
.writeq
)
256 if self
.writeq
.empty():
257 print _("writers finished already")
260 print _("waiting for writers to finish ...")
261 #for t in threading.enumerate():
264 #using empty() might be more reliable:
265 while not self
.writeq
.empty() and len(threading
.enumerate()) > 1:
266 # TODO: Do we need to actually tell the progress indicator to move, or is it already moving, and we just need to process events...
267 while gtk
.events_pending(): # see http://faq.pygtk.org/index.py?req=index for more hints (3.7)
268 gtk
.main_iteration(False)
270 print _("... writers finished")
273 return (totstored
, totdups
, totpartial
, toterrors
, endtime
-starttime
)
276 def importFiles(self
, q
):
277 """"Read filenames in self.filelist and pass to import_file_dict().
278 Uses a separate database connection if created as a thread (caller
279 passes None or no param as db)."""
288 moveimportedfiles
= False #TODO need to wire this into GUI and make it prettier
289 movefailedfiles
= False #TODO and this too
291 #prepare progress popup window
292 ProgressDialog
= ProgressBar(len(self
.filelist
), self
.parent
)
294 for file in self
.filelist
:
296 filecount
= filecount
+ 1
297 ProgressDialog
.progress_update(file, str(self
.database
.getHandCount()))
299 if not moveimportedfiles
and not movefailedfiles
:
300 (stored
, duplicates
, partial
, errors
, ttime
) = self
.import_file_dict(file, self
.filelist
[file][0]
301 ,self
.filelist
[file][1], self
.filelist
[file][2], q
)
303 totdups
+= duplicates
304 totpartial
+= partial
308 (stored
, duplicates
, partial
, errors
, ttime
) = self
.import_file_dict(file, self
.filelist
[file][0]
309 ,self
.filelist
[file][1], self
.filelist
[file][2], q
)
311 totdups
+= duplicates
312 totpartial
+= partial
314 if moveimportedfiles
:
315 shutil
.move(file, "c:\\fpdbimported\\%d-%s" % (filecount
, os
.path
.basename(file[3:]) ) )
317 fileerrorcount
= fileerrorcount
+ 1
319 shutil
.move(file, "c:\\fpdbfailed\\%d-%s" % (fileerrorcount
, os
.path
.basename(file[3:]) ) )
321 self
.logImport('bulk', file, stored
, duplicates
, partial
, errors
, ttime
, self
.filelist
[file][2])
323 # Tidying up after import
324 if 'dropHudCache' in self
.settings
and self
.settings
['dropHudCache'] == 'drop':
325 self
.database
.rebuild_hudcache()
327 self
.database
.cleanUpTourneyTypes()
328 self
.database
.resetttclean()
329 log
.info (_("No need to rebuild hudcache."))
330 self
.database
.analyzeDB()
331 self
.database
.commit()
335 for i
in xrange( self
.settings
['threads'] ):
336 #print ("sending finish message queue length ="), q.qsize()
337 db
.send_finish_msg(q
)
340 return (totstored
, totdups
, totpartial
, toterrors
)
341 # end def importFiles
344 def calculate_auto(self
, db
):
345 """An heuristic to determine a reasonable value of drop/don't drop"""
346 if len(self
.filelist
) == 1: return "don't drop"
347 if 'handsInDB' not in self
.settings
:
349 tmpcursor
= db
.get_cursor()
350 tmpcursor
.execute("Select count(1) from Hands;")
351 self
.settings
['handsInDB'] = tmpcursor
.fetchone()[0]
353 pass # if this fails we're probably doomed anyway
354 if self
.settings
['handsInDB'] < 5000: return "drop"
355 if len(self
.filelist
) < 50: return "don't drop"
356 if self
.settings
['handsInDB'] > 50000: return "don't drop"
359 def calculate_auto2(self
, db
, scale
, increment
):
360 """A second heuristic to determine a reasonable value of drop/don't drop
361 This one adds up size of files to import to guess number of hands in them
362 Example values of scale and increment params might be 10 and 500 meaning
363 roughly: drop if importing more than 10% (100/scale) of hands in db or if
364 less than 500 hands in db"""
365 size_per_hand
= 1300.0 # wag based on a PS 6-up FLHE file. Actual value not hugely important
366 # as values of scale and increment compensate for it anyway.
367 # decimal used to force float arithmetic
369 # get number of hands in db
370 if 'handsInDB' not in self
.settings
:
372 tmpcursor
= db
.get_cursor()
373 tmpcursor
.execute("Select count(1) from Hands;")
374 self
.settings
['handsInDB'] = tmpcursor
.fetchone()[0]
376 pass # if this fails we're probably doomed anyway
378 # add up size of import files
380 for file in self
.filelist
:
381 if os
.path
.exists(file):
382 stat_info
= os
.stat(file)
383 total_size
+= stat_info
.st_size
385 # if hands_in_db is zero or very low, we want to drop indexes, otherwise compare
386 # import size with db size somehow:
388 if self
.settings
['handsInDB'] < scale
* (total_size
/size_per_hand
) + increment
:
390 #print "auto2: handsindb =", self.settings['handsInDB'], "total_size =", total_size, "size_per_hand =", \
391 # size_per_hand, "inc =", increment, "return:", ret
394 #Run import on updated files, then store latest update time. Called from GuiAutoImport.py
395 def runUpdated(self
):
396 #Check for new files in monitored directories
397 #todo: make efficient - always checks for new file, should be able to use mtime of directory
398 # ^^ May not work on windows
400 #rulog = open('runUpdated.txt', 'a')
401 #rulog.writelines("runUpdated ... ")
402 for site
in self
.dirlist
:
403 self
.addImportDirectory(self
.dirlist
[site
][0], False, site
, self
.dirlist
[site
][1])
405 for file in self
.filelist
:
406 if os
.path
.exists(file):
407 stat_info
= os
.stat(file)
408 #rulog.writelines("path exists ")
409 if file in self
.updatedsize
: # we should be able to assume that if we're in size, we're in time as well
410 if stat_info
.st_size
> self
.updatedsize
[file] or stat_info
.st_mtime
> self
.updatedtime
[file]:
411 # print "file",file," updated", os.path.basename(file), stat_info.st_size, self.updatedsize[file], stat_info.st_mtime, self.updatedtime[file]
413 if not os
.path
.isdir(file):
414 self
.caller
.addText("\n"+os
.path
.basename(file))
415 except KeyError: # TODO: What error happens here?
417 (stored
, duplicates
, partial
, errors
, ttime
) = self
.import_file_dict(file, self
.filelist
[file][0]
418 ,self
.filelist
[file][1], self
.filelist
[file][2], None)
419 self
.logImport('auto', file, stored
, duplicates
, partial
, errors
, ttime
, self
.filelist
[file][2])
420 self
.database
.commit()
422 if not os
.path
.isdir(file): # Note: This assumes that whatever calls us has an "addText" func
423 self
.caller
.addText(" %d stored, %d duplicates, %d partial, %d errors (time = %f)" % (stored
, duplicates
, partial
, errors
, ttime
))
424 except KeyError: # TODO: Again, what error happens here? fix when we find out ..
426 self
.updatedsize
[file] = stat_info
.st_size
427 self
.updatedtime
[file] = time()
429 if os
.path
.isdir(file) or (time() - stat_info
.st_mtime
) < 60:
430 self
.updatedsize
[file] = 0
431 self
.updatedtime
[file] = 0
433 self
.updatedsize
[file] = stat_info
.st_size
434 self
.updatedtime
[file] = time()
436 self
.removeFromFileList
[file] = True
438 self
.addToDirList
= filter(lambda x
: self
.addImportDirectory(x
, True, self
.addToDirList
[x
][0], self
.addToDirList
[x
][1]), self
.addToDirList
)
440 for file in self
.removeFromFileList
:
441 if file in self
.filelist
:
442 del self
.filelist
[file]
444 self
.addToDirList
= {}
445 self
.removeFromFileList
= {}
446 self
.database
.rollback()
447 #rulog.writelines(" finished\n")
450 # This is now an internal function that should not be called directly.
451 def import_file_dict(self
, file, site
, filter, fileId
, q
=None):
453 if os
.path
.isdir(file):
454 self
.addToDirList
[file] = [site
] + [filter]
457 (stored
, duplicates
, partial
, errors
, ttime
) = (0, 0, 0, 0, time())
459 # Load filter, process file, pass returned filename to import_fpdb_file
460 if self
.settings
['threads'] > 0 and self
.writeq
is not None:
461 log
.info((_("Converting %s") % file) + " (" + str(q
.qsize()) + ")")
462 else: log
.info(_("Converting %s") % file)
464 filter_name
= filter.replace("ToFpdb", "")
465 mod
= __import__(filter)
466 obj
= getattr(mod
, filter_name
, None)
469 if file in self
.pos_in_file
: idx
= self
.pos_in_file
[file]
470 else: self
.pos_in_file
[file], idx
= 0, 0
472 hhc
= obj( self
.config
, in_path
= file, index
= idx
473 ,starsArchive
= self
.settings
['starsArchive']
474 ,ftpArchive
= self
.settings
['ftpArchive']
478 if self
.caller
: hhc
.progressNotify()
479 handlist
= hhc
.getProcessedHands()
480 self
.pos_in_file
[file] = hhc
.getLastCharacterRead()
481 (phands
, ihands
, to_hud
) = ([], [], [])
482 self
.database
.resetBulkCache()
484 ####Lock Placeholder####
485 for hand
in handlist
:
486 hand
.prepInsert(self
.database
, printtest
= self
.settings
['testData'])
487 self
.database
.commit()
489 ####Lock Placeholder####
494 ####Lock Placeholder####
495 id = self
.database
.nextHandId()
496 sctimer
, ihtimer
, hctimer
= 0,0,0
497 for i
in range(len(phands
)):
498 doinsert
= len(phands
)==i
+1
501 id = hand
.getHandId(self
.database
, id)
503 hand
.updateSessionsCache(self
.database
, None, doinsert
)
504 sctimer
+= time() - stime
506 hand
.insertHands(self
.database
, fileId
, doinsert
, self
.settings
['testData'])
507 ihtimer
= time() - stime
509 hand
.updateHudCache(self
.database
, doinsert
)
510 hctimer
= time() - stime
512 to_hud
.append(hand
.dbid_hands
)
513 except Exceptions
.FpdbHandDuplicate
:
515 #If last hand in the file is a duplicate this will backtrack and insert the new hand records
516 if (doinsert
and ihands
):
518 hp
= hand
.handsplayers
519 hand
.hero
, self
.database
.hbulk
, hand
.handsplayers
= 0, self
.database
.hbulk
[:-1], [] #making sure we don't insert data from this hand
520 hand
.updateSessionsCache(self
.database
, None, doinsert
)
521 hand
.insertHands(self
.database
, fileId
, doinsert
, self
.settings
['testData'])
522 hand
.updateHudCache(self
.database
, doinsert
)
523 hand
.handsplayers
= hp
524 #log.debug("DEBUG: hand.updateSessionsCache: %s" % (t5tot))
525 #log.debug("DEBUG: hand.insertHands: %s" % (t6tot))
526 #log.debug("DEBUG: hand.updateHudCache: %s" % (t7tot))
527 self
.database
.commit()
528 ####Lock Placeholder####
530 for i
in range(len(ihands
)):
531 doinsert
= len(ihands
)==i
+1
533 hand
.insertHandsPlayers(self
.database
, doinsert
, self
.settings
['testData'])
534 hand
.insertHandsActions(self
.database
, doinsert
, self
.settings
['testData'])
535 hand
.insertHandsStove(self
.database
, doinsert
)
536 self
.database
.commit()
538 #pipe the Hands.id out to the HUD
542 print _("fpdb_import: sending hand to hud"), hid
, "pipe =", self
.caller
.pipe_to_hud
543 self
.caller
.pipe_to_hud
.stdin
.write("%s" % (hid
) + os
.linesep
)
545 log
.error(_("Failed to send hand to HUD: %s") % e
)
547 partial
= getattr(hhc
, 'numPartial')
548 errors
= getattr(hhc
, 'numErrors')
549 stored
= getattr(hhc
, 'numHands')
553 # Really ugly hack to allow testing Hands within the HHC from someone
554 # with only an Importer objec
555 if self
.settings
['cacheHHC']:
556 self
.handhistoryconverter
= hhc
558 # conversion didn't work
559 # TODO: appropriate response?
560 return (0, 0, 0, 1, time() - ttime
)
562 log
.warning(_("Unknown filter name %s in filter %s.") %(filter_name
, filter))
563 return (0, 0, 0, 1, time() - ttime
)
565 ttime
= time() - ttime
567 #This will barf if conv.getStatus != True
568 return (stored
, duplicates
, partial
, errors
, ttime
)
573 Popup window to show progress
575 Init method sets up total number of expected iterations
576 If no parent is passed to init, command line
577 mode assumed, and does not create a progress bar
583 self
.progress
.destroy()
586 def progress_update(self
, file, handcount
):
593 #update sum if fraction exceeds expected total number of iterations
594 if self
.fraction
> self
.sum:
597 #progress bar total set to 1 plus the number of items,to prevent it
598 #reaching 100% prior to processing fully completing
600 progress_percent
= float(self
.fraction
) / (float(self
.sum) + 1.0)
601 progress_text
= (self
.title
+ " "
602 + str(self
.fraction
) + " / " + str(self
.sum))
604 self
.pbar
.set_fraction(progress_percent
)
605 self
.pbar
.set_text(progress_text
)
607 self
.handcount
.set_text(_("Database Statistics") + " - " + _("Number of Hands:") + " " + handcount
)
609 now
= datetime
.datetime
.now()
610 now_formatted
= now
.strftime("%H:%M:%S")
611 self
.progresstext
.set_text(now_formatted
+ " - "+self
.title
+ " " +file+"\n")
614 def __init__(self
, sum, parent
):
618 #no parent is passed, assume this is being run from the
619 #command line, so return immediately
624 self
.title
= _("Importing")
626 self
.progress
= gtk
.Window(gtk
.WINDOW_TOPLEVEL
)
627 self
.progress
.set_size_request(500,150)
629 self
.progress
.set_resizable(False)
630 self
.progress
.set_modal(True)
631 self
.progress
.set_transient_for(self
.parent
)
632 self
.progress
.set_decorated(True)
633 self
.progress
.set_deletable(False)
634 self
.progress
.set_title(self
.title
)
636 vbox
= gtk
.VBox(False, 5)
637 vbox
.set_border_width(10)
638 self
.progress
.add(vbox
)
641 align
= gtk
.Alignment(0, 0, 0, 0)
642 vbox
.pack_start(align
, False, True, 2)
645 self
.pbar
= gtk
.ProgressBar()
649 align
= gtk
.Alignment(0, 0, 0, 0)
650 vbox
.pack_start(align
, False, True, 2)
653 self
.handcount
= gtk
.Label()
654 align
.add(self
.handcount
)
655 self
.handcount
.show()
657 align
= gtk
.Alignment(0, 0, 0, 0)
658 vbox
.pack_start(align
, False, True, 0)
661 self
.progresstext
= gtk
.Label()
662 self
.progresstext
.set_line_wrap(True)
663 self
.progresstext
.set_selectable(True)
664 align
.add(self
.progresstext
)
665 self
.progresstext
.show()
670 if __name__
== "__main__":
671 print _("CLI for importing hands is GuiBulkImport.py")