4 Real time log files watcher supporting log rotation.
6 Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
7 Martin Langhoff <martin.langhoff@remote-learner.net>
17 class LogWatcher(object):
18 """Looks for changes in all files of a directory.
19 This is useful for watching log file changes in real-time.
20 It also supports files rotation.
24 >>> def callback(filename, lines):
25 ... print filename, lines
27 >>> l = LogWatcher("/var/log/", callback)
31 def __init__(self
, folder
, callback
, extensions
=["log"], tail_lines
=0):
38 a function which is called every time a new line in a
39 file being watched is found;
40 this is called with "filename" and "lines" arguments.
43 only watch files with these extensions
46 read last N lines from files being watched before starting
49 self
.callback
= callback
50 self
.folder
= os
.path
.realpath(folder
)
51 self
.extensions
= extensions
52 assert os
.path
.isdir(self
.folder
), "%s does not exists" \
54 assert callable(callback
)
56 # The first time we run the script we move all file markers at EOF.
57 # In case of files created afterwards we don't do this.
58 for id, file in self
.files_map
.iteritems():
59 file.seek(os
.path
.getsize(file.name
)) # EOF
61 lines
= self
.tail(file.name
, tail_lines
)
63 self
.callback(file.name
, lines
)
68 def loop(self
, interval
=0.1, async=False):
70 If async is True make one loop then return.
74 for fid
, file in list(self
.files_map
.iteritems()):
81 """Log when a file is un/watched"""
85 """List directory and filter files by extension.
86 You may want to override this to add extra logic or
89 ls
= os
.listdir(self
.folder
)
91 return [x
for x
in ls
if os
.path
.splitext(x
)[1][1:] \
97 def tail(fname
, window
):
98 """Read last N lines from file fname."""
102 if err
.errno
== errno
.ENOENT
:
108 f
.seek(0, os
.SEEK_END
)
114 step
= (block
* BUFSIZ
)
115 if abs(step
) >= fsize
:
119 f
.seek(step
, os
.SEEK_END
)
120 data
= f
.read().strip()
121 if data
.count('\n') >= window
:
125 return data
.splitlines()[-window
:]
127 def update_files(self
):
129 for name
in self
.listdir():
130 absname
= os
.path
.realpath(os
.path
.join(self
.folder
, name
))
132 st
= os
.stat(absname
)
133 except EnvironmentError, err
:
134 if err
.errno
!= errno
.ENOENT
:
137 if not stat
.S_ISREG(st
.st_mode
):
139 fid
= self
.get_file_id(st
)
140 ls
.append((fid
, absname
))
142 # check existent files
143 for fid
, file in list(self
.files_map
.iteritems()):
145 st
= os
.stat(file.name
)
146 except EnvironmentError, err
:
147 if err
.errno
== errno
.ENOENT
:
148 self
.unwatch(file, fid
)
152 if fid
!= self
.get_file_id(st
):
153 # same name but different file (rotation); reload it.
154 self
.unwatch(file, fid
)
155 self
.watch(file.name
)
158 for fid
, fname
in ls
:
159 if fid
not in self
.files_map
:
162 def readfile(self
, file):
163 lines
= file.readlines()
165 self
.callback(file.name
, lines
)
167 def watch(self
, fname
):
169 file = open(fname
, "r")
170 fid
= self
.get_file_id(os
.stat(fname
))
171 except EnvironmentError, err
:
172 if err
.errno
!= errno
.ENOENT
:
175 self
.log("watching logfile %s" % fname
)
176 self
.files_map
[fid
] = file
178 def unwatch(self
, file, fid
):
179 # file no longer exists; if it has been renamed
180 # try to read it for the last time in case the
181 # log rotator has written something in it.
182 lines
= self
.readfile(file)
183 self
.log("un-watching logfile %s" % file.name
)
184 del self
.files_map
[fid
]
186 self
.callback(file.name
, lines
)
190 return "%xg%x" % (st
.st_dev
, st
.st_ino
)
193 for id, file in self
.files_map
.iteritems():
195 self
.files_map
.clear()
199 def __init__(self
, fpath
, keepdays
):
200 self
.committime
= int(time
.time())
201 self
.datestamp
= self
._getdatestamp
()
202 self
.keepdays
= keepdays
203 self
.conn
= sqlite3
.connect(fpath
)
204 self
.cur
= self
.conn
.cursor()
205 self
.cur
.execute("""CREATE TABLE IF NOT EXISTS logentries
211 pviews_avg_us INTEGER,
212 pview_50th_us INTEGER,
213 pview_80th_us INTEGER,
214 unique_users INTEGER )""")
215 self
.cur
.execute("""CREATE INDEX IF NOT EXISTS logentries_primary_idx
216 ON logentries (timestamp,hostname)""")
218 def parseline(self
, line
):
220 # Mar 23 19:57:34 rl01-3-v1552 www_stats: stats {"hits":1,"pviews":0,"pview_avg_us":0,"boundary_epoch":1427140620.0,"unique_users":0,"pview_80th_us":0,"pview_50th_us":0,"ajax_hits":0}
222 parts
= line
.split(' ', 6)
224 stats
= json
.loads(parts
[6])
225 stats
['hostname'] = hostname
226 # cast a few values to int
227 stats
['boundary_epoch'] = int(stats
['boundary_epoch'])
228 stats
['pview_avg_us'] = int(stats
['pview_avg_us'])
229 stats
['pview_80th_us'] = int(stats
['pview_80th_us'])
230 stats
['pview_50th_us'] = int(stats
['pview_50th_us'])
231 # workaround a buglet in compresslog v0.7
232 # that gives us a list containing just one int for unique users :-/
233 if isinstance(stats
['unique_users'], list):
234 stats
['unique_users'] = stats
['unique_users'][0]
236 e
= sys
.exc_info()[0]
237 sys
.stderr
.write('Caught exception %s while processing %s\n' % (e
, line
))
239 self
.cur
.execute("""INSERT INTO logentries VALUES(:boundary_epoch, :hostname, :hits, :ajax_hits,
240 :pviews, :pview_avg_us, :pview_50th_us, :pview_80th_us,
241 :unique_users)""", stats
)
242 # TODO: if this turns out to be a bottleneck
243 # we can defer commits to every N inserts
244 # (and add a commit on exit, adding sighandlers)
248 def maybe_commit(self
):
249 now
= int(time
.time())
250 if (self
.committime
+ 20) < now
:
252 self
.committime
= now
255 cur_datestamp
= self
._getdatestamp
()
256 if not self
.datestamp
== cur_datestamp
:
257 self
.datestamp
= cur_datestamp
259 # this will trigger soon after midnight
260 # so we want to preserve the last 24hs
261 gcepoch
= int(time
.time()) - (24 * 60 * 60 * self
.keepdays
)
262 self
.cur
.execute('DELETE FROM logentries WHERE timestamp < ?', list([gcepoch
]) )
264 self
.cur
.execute('VACUUM')
266 def _getdatestamp(self
):
267 return time
.strftime('%Y%m%d', time
.localtime())
272 from optparse
import OptionParser
274 if __name__
== '__main__':
275 parser
= OptionParser("usage: %prog [options] /path/to/logdir /path/to/db.sqlite")
276 parser
.add_option('-k', '--keep', dest
='keep', action
='store', type='int', default
=2,
277 help='Days of data to keep, defaults to 2')
278 parser
.add_option('-p', '--pidfile', dest
='pidfile', action
='store', type='string',
279 help='Pidfile, to help service scripts.')
280 (opts
, args
) = parser
.parse_args()
283 if not os
.path
.isdir(logdir
):
284 sys
.stderr
.write('First argument should be log directory\n')
288 open(opts
.pidfile
, 'w').write(str(os
.getpid()))
290 parser
= Parser(sqlfname
, int(opts
.keep
))
291 def callback(filename
, lines
):
293 parser
.parseline(line
)
295 l
= LogWatcher(logdir
, callback
)