2 // Copyright © 2011-2019 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
29 Logical network reader.
31 Read STOMP frames from the connection, create MessageData
32 structures from the received data, and push the MessageData to the client.
34 func (c
*Connection
) reader() {
40 c
.logx("RDR_RECEIVE_FRAME", f
.Command
, f
.Headers
, HexData(f
.Body
),
46 f
.Headers
= append(f
.Headers
, "connection_read_error", e
.Error())
47 md
:= MessageData
{Message(f
), e
}
49 if e
== io
.EOF
&& !c
.connected
{
50 c
.log("RDR_SHUTDOWN_EOF", e
)
52 c
.log("RDR_CONN_GENL_ERR", e
)
62 c
.mets
.tfr
+= 1 // Total frames read
63 // Headers already decoded
64 c
.mets
.tbr
+= m
.Size(false) // Total bytes read
66 //*************************************************************************
68 md
:= MessageData
{m
, e
}
72 sid
, ok
:= f
.Headers
.Contains(HK_SUBSCRIPTION
)
73 if !ok
{ // This should *NEVER* happen
74 panic(fmt
.Sprintf("stompngo INTERNAL ERROR: command:<%s> headers:<%v>",
75 f
.Command
, f
.Headers
))
78 ps
, sok
:= c
.subs
[sid
] // This is a map of pointers .....
81 // The sub can be gone under some timing conditions. In that case
82 // we log it of possible, and continue (hope for the best).
83 c
.log("RDR_NOSUB", sid
, m
.Command
, m
.Headers
)
87 // The sub can also already be closed under some conditions.
88 // Again, we log that if possible, and continue
89 c
.log("RDR_CLSUB", sid
, m
.Command
, m
.Headers
)
92 // Handle subscription draining
101 c
.logx("RDR_DROPM", ps
.drmc
, sid
, m
.Command
,
102 m
.Headers
, HexData(m
.Body
))
119 panic(fmt
.Sprintf("Broker SEVERE ERROR, not STOMP? command:<%s> headers:<%v>",
120 f
.Command
, f
.Headers
))
123 //*************************************************************************
127 c
.log("RDR_SHUTDOWN detected")
138 c
.log("RDR_SHUTDOWN", time
.Now())
142 Physical frame reader.
144 This parses a single STOMP frame from data off of the wire, and
145 returns a Frame, with a possible error.
147 Note: this functionality could hang or exhibit other erroneous behavior
148 if running against a non-compliant STOMP server.
150 func (c
*Connection
) readFrame() (f Frame
, e error
) {
151 f
= Frame
{"", Headers
{}, NULLBUFF
}
153 // Read f.Command or line ends (maybe heartbeats)
155 s
, e
:= c
.rdr
.ReadString('\n')
156 if c
.checkReadError(e
) != nil {
165 f
.Command
= s
[0 : len(s
)-1]
170 // Validate the command
171 if _
, ok
:= validCmds
[f
.Command
]; !ok
{
172 ev
:= fmt
.Errorf("%s\n%s", EINVBCMD
, HexData([]byte(f
.Command
)))
178 s
, e
:= c
.rdr
.ReadString('\n')
179 if c
.checkReadError(e
) != nil {
189 p
:= strings
.SplitN(s
, ":", 2)
193 // Always decode regardless of protocol level. See issue #47.
196 f
.Headers
= append(f
.Headers
, p
[0], p
[1])
199 e
= checkHeaders(f
.Headers
, c
.Protocol())
204 if v
, ok
:= f
.Headers
.Contains(HK_CONTENT_LENGTH
); ok
{
205 l
, e
:= strconv
.Atoi(strings
.TrimSpace(v
))
210 f
.Body
, e
= readUntilNul(c
)
212 f
.Body
, e
= readBody(c
, l
)
215 // content-length not present
216 f
.Body
, e
= readUntilNul(c
)
218 if c
.checkReadError(e
) != nil {
224 // End of read loop - set no deadline
226 _
= c
.netconn
.SetReadDeadline(c
.dld
.t0
)
231 func (c
*Connection
) updateHBReads() {
233 c
.hbd
.lr
= time
.Now().UnixNano() // Latest good read
237 func (c
*Connection
) setReadDeadline() {
238 if c
.dld
.rde
&& c
.dld
.rds
{
239 _
= c
.netconn
.SetReadDeadline(time
.Now().Add(c
.dld
.rdld
))
243 func (c
*Connection
) checkReadError(e error
) error
{
244 //c.log("checkReadError", e)
248 ne
, ok
:= e
.(net
.Error
)
253 //c.log("is a timeout")
255 c
.log("invoking read deadline callback")
256 c
.dld
.dlnotify(e
, false)