2 // Copyright © 2011-2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
26 Logical network reader.
28 Read STOMP frames from the connection, create MessageData
29 structures from the received data, and push the MessageData to the client.
31 func (c
*Connection
) reader() {
33 q
:= false // Shutdown indicator
37 c
.log("RDR_RECEIVE_FRAME", f
, "RDR_RECEIVE_ERR", e
)
39 f
.Headers
= append(f
.Headers
, "connection_read_error", e
.Error())
40 md
:= MessageData
{Message(f
), e
}
45 if f
.Command
== "" && q
{
50 c
.mets
.tfr
+= 1 // Total frames read
51 // Headers already decoded
52 c
.mets
.tbr
+= m
.Size(false) // Total bytes read
53 md
:= MessageData
{m
, e
}
55 // TODO START - can this be simplified ? Look cleaner ?
57 if sid
, ok
:= f
.Headers
.Contains(HK_SUBSCRIPTION
); ok
{
58 // This is a read lock
60 // This sub can be already gone under some timing circumstances
61 if _
, sok
:= c
.subs
[sid
]; sok
{
62 // And it can also be closed under some timing circumstances
64 c
.log("RDR_CLSUB", sid
, md
)
68 if c
.subs
[sid
].drmc
> c
.subs
[sid
].dra
{
69 c
.log("RDR_DROPM", c
.subs
[sid
].drmc
, sid
, md
)
78 c
.log("RDR_NOSUB", sid
, md
)
82 // RECEIPTs and ERRORs are never drained. They actually cannot
83 // be drained in any logical manner because they do not have a
84 // 'subscription' header.
100 c
.log("RDR_SHUTDOWN", time
.Now())
104 Physical frame reader.
106 This parses a single STOMP frame from data off of the wire, and
107 returns a Frame, with a possible error.
109 Note: this functionality could hang or exhibit other erroneous behavior
110 if running against a non-compliant STOMP server.
112 func (c
*Connection
) readFrame() (f Frame
, e error
) {
113 f
= Frame
{"", Headers
{}, NULLBUFF
}
114 // Read f.Command or line ends (maybe heartbeats)
116 s
, e
:= c
.rdr
.ReadString('\n')
126 f
.Command
= s
[0 : len(s
)-1]
130 // c.log("read slash n")
132 // Validate the command
133 if _
, ok
:= validCmds
[f
.Command
]; !ok
{
138 s
, e
:= c
.rdr
.ReadString('\n')
149 p
:= strings
.SplitN(s
, ":", 2)
153 if c
.Protocol() != SPL_10
{
157 f
.Headers
= append(f
.Headers
, p
[0], p
[1])
160 e
= checkHeaders(f
.Headers
, c
.Protocol())
165 if v
, ok
:= f
.Headers
.Contains(HK_CONTENT_LENGTH
); ok
{
166 l
, e
:= strconv
.Atoi(strings
.TrimSpace(v
))
171 f
.Body
, e
= readUntilNul(c
.rdr
)
173 f
.Body
, e
= readBody(c
.rdr
, l
)
176 // content-length not present
177 f
.Body
, e
= readUntilNul(c
.rdr
)
189 func (c
*Connection
) updateHBReads() {
191 c
.hbd
.lr
= time
.Now().UnixNano() // Latest good read