1 From 8d6f2e3fe8851b581309da25fc4c32f8be675932 Mon Sep 17 00:00:00 2001
2 From: Brian Goff <cpuguy83@gmail.com>
3 Date: Mon, 11 Jul 2016 16:31:42 -0400
4 Subject: [PATCH] Fix issues with tailing rotated jsonlog file
6 Fixes a race where the log reader would get events for both an actual
7 rotation as we from fsnotify (`fsnotify.Rename`).
8 This issue becomes extremely apparent when rotations are fast, for
12 $ docker run -d --name test --log-opt max-size=1 --log-opt max-file=2
13 busybox sh -c 'while true; do echo hello; usleep 100000; done'
16 With this change the log reader for jsonlogs can handle rotations that
19 Instead of listening for both fs events AND rotation events
20 simultaneously, potentially meaning we see 2 rotations for only a single
21 rotation due to channel buffering, only listen for fs events (like
22 `Rename`) and then wait to be notified about rotation by the logger.
23 This makes sure that we don't see 2 rotations for 1, and that we don't
24 start trying to read until the logger is actually ready for us to.
26 Signed-off-by: Brian Goff <cpuguy83@gmail.com>
28 This commit is pending upstream commit fixing broken log tailing. The
29 original commit can be found in the PR here:
31 - https://github.com/docker/docker/pull/24514
33 Signed-off-by: Christian Stewart <christian@paral.in>
35 daemon/logger/jsonfilelog/read.go | 180 +++++++++++++++++++++++++-------------
36 1 file changed, 119 insertions(+), 61 deletions(-)
38 diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
39 index bea83dd..0cb44af 100644
40 --- a/daemon/logger/jsonfilelog/read.go
41 +++ b/daemon/logger/jsonfilelog/read.go
42 @@ -3,11 +3,14 @@ package jsonfilelog
52 + "gopkg.in/fsnotify.v1"
54 "github.com/Sirupsen/logrus"
55 "github.com/docker/docker/daemon/logger"
56 "github.com/docker/docker/pkg/filenotify"
57 @@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
58 func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
59 defer close(logWatcher.Msg)
61 + // lock so the read stream doesn't get corrupted do to rotations or other log data written while we read
62 + // This will block writes!!!
65 pth := l.writer.LogPath()
66 var files []io.ReadSeeker
67 for i := l.writer.MaxFiles(); i > 1; i-- {
68 @@ -61,6 +68,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
69 latestFile, err := os.Open(pth)
76 @@ -80,6 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
77 if err := latestFile.Close(); err != nil {
78 logrus.Errorf("Error closing file: %v", err)
84 @@ -87,7 +96,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
85 latestFile.Seek(0, os.SEEK_END)
89 l.readers[logWatcher] = struct{}{}
92 @@ -128,92 +136,142 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
96 +func watchFile(name string) (filenotify.FileWatcher, error) {
97 + fileWatcher, err := filenotify.New()
102 + if err := fileWatcher.Add(name); err != nil {
103 + logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
104 + fileWatcher.Close()
105 + fileWatcher = filenotify.NewPollingWatcher()
107 + if err := fileWatcher.Add(name); err != nil {
108 + fileWatcher.Close()
109 + logrus.Debugf("error watching log file for modifications: %v", err)
113 + return fileWatcher, nil
116 func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
117 dec := json.NewDecoder(f)
118 l := &jsonlog.JSONLog{}
120 - fileWatcher, err := filenotify.New()
122 + fileWatcher, err := watchFile(name)
124 logWatcher.Err <- err
133 - if err := fileWatcher.Add(name); err != nil {
134 - logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
135 - fileWatcher.Close()
136 - fileWatcher = filenotify.NewPollingWatcher()
138 + handleRotate := func() error {
140 + fileWatcher.Remove(name)
142 + // retry when the file doesn't exist
143 + for retries := 0; retries <= 5; retries++ {
144 + f, err = os.Open(name)
145 + if err == nil || !os.IsNotExist(err) {
152 if err := fileWatcher.Add(name); err != nil {
153 - logrus.Debugf("error watching log file for modifications: %v", err)
154 - logWatcher.Err <- err
158 + dec = json.NewDecoder(f)
164 - msg, err := decodeLogLine(dec, l)
167 - // try again because this shouldn't happen
168 - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
169 - dec = json.NewDecoder(f)
172 + errRetry := errors.New("retry")
173 + errDone := errors.New("done")
174 + waitRead := func() error {
176 + case e := <-fileWatcher.Events():
178 + case fsnotify.Write:
179 + dec = json.NewDecoder(f)
181 + case fsnotify.Rename, fsnotify.Remove:
183 + if err := handleRotate(); err != nil {
187 - // io.ErrUnexpectedEOF is returned from json.Decoder when there is
188 - // remaining data in the parser's buffer while an io.EOF occurs.
189 - // If the json logger writes a partial json log entry to the disk
190 - // while at the same time the decoder tries to decode it, the race condition happens.
191 - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
192 - reader := io.MultiReader(dec.Buffered(), f)
193 - dec = json.NewDecoder(reader)
199 + case err := <-fileWatcher.Errors():
200 + logrus.Debug("logger got error watching file: %v", err)
201 + // Something happened, let's try and stay alive and create a new watcher
203 + fileWatcher, err = watchFile(name)
213 + case <-logWatcher.WatchClose():
214 + fileWatcher.Remove(name)
220 - case <-fileWatcher.Events():
221 - dec = json.NewDecoder(f)
223 - case <-fileWatcher.Errors():
224 - logWatcher.Err <- err
226 - case <-logWatcher.WatchClose():
227 - fileWatcher.Remove(name)
229 - case <-notifyRotate:
231 - fileWatcher.Remove(name)
233 - // retry when the file doesn't exist
234 - for retries := 0; retries <= 5; retries++ {
235 - f, err = os.Open(name)
236 - if err == nil || !os.IsNotExist(err) {
239 + handleDecodeErr := func(err error) error {
241 + for err := waitRead(); err != nil; {
242 + if err == errRetry {
243 + // retry the waitRead
250 + // try again because this shouldn't happen
251 + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
252 + dec = json.NewDecoder(f)
256 + // io.ErrUnexpectedEOF is returned from json.Decoder when there is
257 + // remaining data in the parser's buffer while an io.EOF occurs.
258 + // If the json logger writes a partial json log entry to the disk
259 + // while at the same time the decoder tries to decode it, the race condition happens.
260 + if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
261 + reader := io.MultiReader(dec.Buffered(), f)
262 + dec = json.NewDecoder(reader)
269 - if err = fileWatcher.Add(name); err != nil {
270 - logWatcher.Err <- err
274 - logWatcher.Err <- err
277 + msg, err := decodeLogLine(dec, l)
279 + if err := handleDecodeErr(err); err != nil {
280 + if err == errDone {
284 - dec = json.NewDecoder(f)
286 + // we got an unrecoverable error, so return
287 + logWatcher.Err <- err
290 + // ready to try again
294 retries = 0 // reset retries since we've succeeded