watchlog: be more resilient, log strange input and soldier on
[compresslog.git] / watchlog
blob436c3c18a6c37c66cdd32014eb514a9c12436dcb
1 #!/usr/bin/env python
3 """
4 Real time log files watcher supporting log rotation.
6 Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
7 Martin Langhoff <martin.langhoff@remote-learner.net>
9 License: MIT
10 """
12 import os
13 import time
14 import errno
15 import stat
17 class LogWatcher(object):
18 """Looks for changes in all files of a directory.
19 This is useful for watching log file changes in real-time.
20 It also supports files rotation.
22 Example:
24 >>> def callback(filename, lines):
25 ... print filename, lines
26 ...
27 >>> l = LogWatcher("/var/log/", callback)
28 >>> l.loop()
29 """
31 def __init__(self, folder, callback, extensions=["log"], tail_lines=0):
32 """Arguments:
34 (str) @folder:
35 the folder to watch
37 (callable) @callback:
38 a function which is called every time a new line in a
39 file being watched is found;
40 this is called with "filename" and "lines" arguments.
42 (list) @extensions:
43 only watch files with these extensions
45 (int) @tail_lines:
46 read last N lines from files being watched before starting
47 """
48 self.files_map = {}
49 self.callback = callback
50 self.folder = os.path.realpath(folder)
51 self.extensions = extensions
52 assert os.path.isdir(self.folder), "%s does not exists" \
53 % self.folder
54 assert callable(callback)
55 self.update_files()
56 # The first time we run the script we move all file markers at EOF.
57 # In case of files created afterwards we don't do this.
58 for id, file in self.files_map.iteritems():
59 file.seek(os.path.getsize(file.name)) # EOF
60 if tail_lines:
61 lines = self.tail(file.name, tail_lines)
62 if lines:
63 self.callback(file.name, lines)
65 def __del__(self):
66 self.close()
68 def loop(self, interval=0.1, async=False):
69 """Start the loop.
70 If async is True make one loop then return.
71 """
72 while 1:
73 self.update_files()
74 for fid, file in list(self.files_map.iteritems()):
75 self.readfile(file)
76 if async:
77 return
78 time.sleep(interval)
80 def log(self, line):
81 """Log when a file is un/watched"""
82 print line
84 def listdir(self):
85 """List directory and filter files by extension.
86 You may want to override this to add extra logic or
87 globbling support.
88 """
89 ls = os.listdir(self.folder)
90 if self.extensions:
91 return [x for x in ls if os.path.splitext(x)[1][1:] \
92 in self.extensions]
93 else:
94 return ls
96 @staticmethod
97 def tail(fname, window):
98 """Read last N lines from file fname."""
99 try:
100 f = open(fname, 'r')
101 except IOError, err:
102 if err.errno == errno.ENOENT:
103 return []
104 else:
105 raise
106 else:
107 BUFSIZ = 1024
108 f.seek(0, os.SEEK_END)
109 fsize = f.tell()
110 block = -1
111 data = ""
112 exit = False
113 while not exit:
114 step = (block * BUFSIZ)
115 if abs(step) >= fsize:
116 f.seek(0)
117 exit = True
118 else:
119 f.seek(step, os.SEEK_END)
120 data = f.read().strip()
121 if data.count('\n') >= window:
122 break
123 else:
124 block -= 1
125 return data.splitlines()[-window:]
127 def update_files(self):
128 ls = []
129 for name in self.listdir():
130 absname = os.path.realpath(os.path.join(self.folder, name))
131 try:
132 st = os.stat(absname)
133 except EnvironmentError, err:
134 if err.errno != errno.ENOENT:
135 raise
136 else:
137 if not stat.S_ISREG(st.st_mode):
138 continue
139 fid = self.get_file_id(st)
140 ls.append((fid, absname))
142 # check existent files
143 for fid, file in list(self.files_map.iteritems()):
144 try:
145 st = os.stat(file.name)
146 except EnvironmentError, err:
147 if err.errno == errno.ENOENT:
148 self.unwatch(file, fid)
149 else:
150 raise
151 else:
152 if fid != self.get_file_id(st):
153 # same name but different file (rotation); reload it.
154 self.unwatch(file, fid)
155 self.watch(file.name)
157 # add new ones
158 for fid, fname in ls:
159 if fid not in self.files_map:
160 self.watch(fname)
162 def readfile(self, file):
163 lines = file.readlines()
164 if lines:
165 self.callback(file.name, lines)
167 def watch(self, fname):
168 try:
169 file = open(fname, "r")
170 fid = self.get_file_id(os.stat(fname))
171 except EnvironmentError, err:
172 if err.errno != errno.ENOENT:
173 raise
174 else:
175 self.log("watching logfile %s" % fname)
176 self.files_map[fid] = file
178 def unwatch(self, file, fid):
179 # file no longer exists; if it has been renamed
180 # try to read it for the last time in case the
181 # log rotator has written something in it.
182 lines = self.readfile(file)
183 self.log("un-watching logfile %s" % file.name)
184 del self.files_map[fid]
185 if lines:
186 self.callback(file.name, lines)
188 @staticmethod
189 def get_file_id(st):
190 return "%xg%x" % (st.st_dev, st.st_ino)
192 def close(self):
193 for id, file in self.files_map.iteritems():
194 file.close()
195 self.files_map.clear()
197 class Parser:
199 def __init__(self, fpath, keepdays):
200 self.committime = int(time.time())
201 self.datestamp = self._getdatestamp()
202 self.keepdays = keepdays
203 self.conn = sqlite3.connect(fpath)
204 self.cur = self.conn.cursor()
205 self.cur.execute("""CREATE TABLE IF NOT EXISTS logentries
206 (timestamp INTEGER,
207 hostname TEXT,
208 hits INTEGER,
209 ajax_hits INTEGER,
210 pviews INTEGER,
211 pviews_avg_us INTEGER,
212 pview_50th_us INTEGER,
213 pview_80th_us INTEGER,
214 unique_users INTEGER )""")
215 self.cur.execute("""CREATE INDEX IF NOT EXISTS logentries_primary_idx
216 ON logentries (timestamp,hostname)""")
218 def parseline(self, line):
219 # sample stats line:
220 # Mar 23 19:57:34 rl01-3-v1552 www_stats: stats {"hits":1,"pviews":0,"pview_avg_us":0,"boundary_epoch":1427140620.0,"unique_users":0,"pview_80th_us":0,"pview_50th_us":0,"ajax_hits":0}
221 try:
222 parts = line.split(' ', 6)
223 hostname = parts[3]
224 stats = json.loads(parts[6])
225 stats['hostname'] = hostname
226 # cast a few values to int
227 stats['boundary_epoch'] = int(stats['boundary_epoch'])
228 stats['pview_avg_us'] = int(stats['pview_avg_us'])
229 stats['pview_80th_us'] = int(stats['pview_80th_us'])
230 stats['pview_50th_us'] = int(stats['pview_50th_us'])
231 # workaround a buglet in compresslog v0.7
232 # that gives us a list containing just one int for unique users :-/
233 if isinstance(stats['unique_users'], list):
234 stats['unique_users'] = stats['unique_users'][0]
235 except:
236 e = sys.exc_info()[0]
237 sys.stderr.write('Caught exception %s while processing %s\n' % (e, line))
239 self.cur.execute("""INSERT INTO logentries VALUES(:boundary_epoch, :hostname, :hits, :ajax_hits,
240 :pviews, :pview_avg_us, :pview_50th_us, :pview_80th_us,
241 :unique_users)""", stats)
242 # TODO: if this turns out to be a bottleneck
243 # we can defer commits to every N inserts
244 # (and add a commit on exit, adding sighandlers)
245 self.maybe_commit()
246 self.maybe_gc()
248 def maybe_commit(self):
249 now = int(time.time())
250 if (self.committime + 20) < now:
251 self.conn.commit()
252 self.committime = now
254 def maybe_gc(self):
255 cur_datestamp = self._getdatestamp()
256 if not self.datestamp == cur_datestamp:
257 self.datestamp = cur_datestamp
258 # and GC
259 # this will trigger soon after midnight
260 # so we want to preserve the last 24hs
261 gcepoch = int(time.time()) - (24 * 60 * 60 * self.keepdays)
262 self.cur.execute('DELETE FROM logentries WHERE timestamp < ?', list([gcepoch]) )
263 self.conn.commit()
264 self.cur.execute('VACUUM')
266 def _getdatestamp(self):
267 return time.strftime('%Y%m%d', time.localtime())
269 import sqlite3
270 import json
271 import sys
272 from optparse import OptionParser
274 if __name__ == '__main__':
275 parser = OptionParser("usage: %prog [options] /path/to/logdir /path/to/db.sqlite")
276 parser.add_option('-k', '--keep', dest='keep', action='store', type='int', default=2,
277 help='Days of data to keep, defaults to 2')
278 parser.add_option('-p', '--pidfile', dest='pidfile', action='store', type='string',
279 help='Pidfile, to help service scripts.')
280 (opts, args) = parser.parse_args()
281 logdir = args[0]
282 sqlfname = args[1]
283 if not os.path.isdir(logdir):
284 sys.stderr.write('First argument should be log directory\n')
285 sys.exit(1)
287 if opts.pidfile:
288 open(opts.pidfile, 'w').write(str(os.getpid()))
290 parser = Parser(sqlfname, int(opts.keep))
291 def callback(filename, lines):
292 for line in lines:
293 parser.parseline(line)
295 l = LogWatcher(logdir, callback)
296 l.loop()