Address issue #50.
[stompngo.git] / reader.go
blob018b952ab75343aa55d09b742e7b73ab6a7e6015
1 //
2 // Copyright © 2011-2019 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "fmt"
21 "io"
22 "net"
23 "strconv"
24 "strings"
25 "time"
29 Logical network reader.
31 Read STOMP frames from the connection, create MessageData
32 structures from the received data, and push the MessageData to the client.
34 func (c *Connection) reader() {
35 readLoop:
36 for {
37 f, e := c.readFrame()
38 logLock.Lock()
39 if c.logger != nil {
40 c.logx("RDR_RECEIVE_FRAME", f.Command, f.Headers, HexData(f.Body),
41 "RDR_RECEIVE_ERR", e)
43 logLock.Unlock()
44 if e != nil {
45 //debug.PrintStack()
46 f.Headers = append(f.Headers, "connection_read_error", e.Error())
47 md := MessageData{Message(f), e}
48 c.handleReadError(md)
49 if e == io.EOF && !c.connected {
50 c.log("RDR_SHUTDOWN_EOF", e)
51 } else {
52 c.log("RDR_CONN_GENL_ERR", e)
54 break readLoop
57 if f.Command == "" {
58 continue readLoop
61 m := Message(f)
62 c.mets.tfr += 1 // Total frames read
63 // Headers already decoded
64 c.mets.tbr += m.Size(false) // Total bytes read
66 //*************************************************************************
67 // Replacement START
68 md := MessageData{m, e}
69 switch f.Command {
71 case MESSAGE:
72 sid, ok := f.Headers.Contains(HK_SUBSCRIPTION)
73 if !ok { // This should *NEVER* happen
74 panic(fmt.Sprintf("stompngo INTERNAL ERROR: command:<%s> headers:<%v>",
75 f.Command, f.Headers))
77 c.subsLock.RLock()
78 ps, sok := c.subs[sid] // This is a map of pointers .....
80 if !sok {
81 // The sub can be gone under some timing conditions. In that case
82 // we log it of possible, and continue (hope for the best).
83 c.log("RDR_NOSUB", sid, m.Command, m.Headers)
84 goto csRUnlock
86 if ps.cs {
87 // The sub can also already be closed under some conditions.
88 // Again, we log that if possible, and continue
89 c.log("RDR_CLSUB", sid, m.Command, m.Headers)
90 goto csRUnlock
92 // Handle subscription draining
93 switch ps.drav {
94 case false:
95 ps.md <- md
96 default:
97 ps.drmc++
98 if ps.drmc > ps.dra {
99 logLock.Lock()
100 if c.logger != nil {
101 c.logx("RDR_DROPM", ps.drmc, sid, m.Command,
102 m.Headers, HexData(m.Body))
104 logLock.Unlock()
105 } else {
106 ps.md <- md
109 csRUnlock:
110 c.subsLock.RUnlock()
112 case ERROR:
113 fallthrough
115 case RECEIPT:
116 c.input <- md
118 default:
119 panic(fmt.Sprintf("Broker SEVERE ERROR, not STOMP? command:<%s> headers:<%v>",
120 f.Command, f.Headers))
122 // Replacement END
123 //*************************************************************************
125 select {
126 case _ = <-c.ssdc:
127 c.log("RDR_SHUTDOWN detected")
128 break readLoop
129 default:
131 c.log("RDR_RELOOP")
133 close(c.input)
134 c.connLock.Lock()
135 c.connected = false
136 c.connLock.Unlock()
137 c.sysAbort()
138 c.log("RDR_SHUTDOWN", time.Now())
142 Physical frame reader.
144 This parses a single STOMP frame from data off of the wire, and
145 returns a Frame, with a possible error.
147 Note: this functionality could hang or exhibit other erroneous behavior
148 if running against a non-compliant STOMP server.
150 func (c *Connection) readFrame() (f Frame, e error) {
151 f = Frame{"", Headers{}, NULLBUFF}
153 // Read f.Command or line ends (maybe heartbeats)
154 c.setReadDeadline()
155 s, e := c.rdr.ReadString('\n')
156 if c.checkReadError(e) != nil {
157 return f, e
159 if s == "" {
160 return f, e
162 if c.hbd != nil {
163 c.updateHBReads()
165 f.Command = s[0 : len(s)-1]
166 if s == "\n" {
167 return f, e
170 // Validate the command
171 if _, ok := validCmds[f.Command]; !ok {
172 ev := fmt.Errorf("%s\n%s", EINVBCMD, HexData([]byte(f.Command)))
173 return f, ev
175 // Read f.Headers
176 for {
177 c.setReadDeadline()
178 s, e := c.rdr.ReadString('\n')
179 if c.checkReadError(e) != nil {
180 return f, e
182 if c.hbd != nil {
183 c.updateHBReads()
185 if s == "\n" {
186 break
188 s = s[0 : len(s)-1]
189 p := strings.SplitN(s, ":", 2)
190 if len(p) != 2 {
191 return f, EUNKHDR
193 // Always decode regardless of protocol level. See issue #47.
194 p[0] = decode(p[0])
195 p[1] = decode(p[1])
196 f.Headers = append(f.Headers, p[0], p[1])
199 e = checkHeaders(f.Headers, c.Protocol())
200 if e != nil {
201 return f, e
203 // Read f.Body
204 if v, ok := f.Headers.Contains(HK_CONTENT_LENGTH); ok {
205 l, e := strconv.Atoi(strings.TrimSpace(v))
206 if e != nil {
207 return f, e
209 if l == 0 {
210 f.Body, e = readUntilNul(c)
211 } else {
212 f.Body, e = readBody(c, l)
214 } else {
215 // content-length not present
216 f.Body, e = readUntilNul(c)
218 if c.checkReadError(e) != nil {
219 return f, e
221 if c.hbd != nil {
222 c.updateHBReads()
224 // End of read loop - set no deadline
225 if c.dld.rde {
226 _ = c.netconn.SetReadDeadline(c.dld.t0)
228 return f, e
231 func (c *Connection) updateHBReads() {
232 c.hbd.rdl.Lock()
233 c.hbd.lr = time.Now().UnixNano() // Latest good read
234 c.hbd.rdl.Unlock()
237 func (c *Connection) setReadDeadline() {
238 if c.dld.rde && c.dld.rds {
239 _ = c.netconn.SetReadDeadline(time.Now().Add(c.dld.rdld))
243 func (c *Connection) checkReadError(e error) error {
244 //c.log("checkReadError", e)
245 if e == nil {
246 return e
248 ne, ok := e.(net.Error)
249 if !ok {
250 return e
252 if ne.Timeout() {
253 //c.log("is a timeout")
254 if c.dld.dns {
255 c.log("invoking read deadline callback")
256 c.dld.dlnotify(e, false)
259 return e