Version bump.
[stompngo.git] / reader.go
blobcc405d056a3d4067548cfc6b2e086c4b015d6950
1 //
2 // Copyright © 2011-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "strconv"
21 "strings"
22 "time"
26 Logical network reader.
28 Read STOMP frames from the connection, create MessageData
29 structures from the received data, and push the MessageData to the client.
31 func (c *Connection) reader() {
33 q := false // Shutdown indicator
35 for {
36 f, e := c.readFrame()
37 c.log("RDR_RECEIVE_FRAME", f, "RDR_RECEIVE_ERR", e)
38 if e != nil {
39 f.Headers = append(f.Headers, "connection_read_error", e.Error())
40 md := MessageData{Message(f), e}
41 c.handleReadError(md)
42 break
45 if f.Command == "" && q {
46 break
49 m := Message(f)
50 c.mets.tfr += 1 // Total frames read
51 // Headers already decoded
52 c.mets.tbr += m.Size(false) // Total bytes read
53 md := MessageData{m, e}
55 // TODO START - can this be simplified ? Look cleaner ?
57 if sid, ok := f.Headers.Contains(HK_SUBSCRIPTION); ok {
58 // This is a read lock
59 c.subsLock.RLock()
60 // This sub can be already gone under some timing circumstances
61 if _, sok := c.subs[sid]; sok {
62 // And it can also be closed under some timing circumstances
63 if c.subs[sid].cs {
64 c.log("RDR_CLSUB", sid, md)
65 } else {
66 if c.subs[sid].drav {
67 c.subs[sid].drmc++
68 if c.subs[sid].drmc > c.subs[sid].dra {
69 c.log("RDR_DROPM", c.subs[sid].drmc, sid, md)
70 } else {
71 c.subs[sid].md <- md
73 } else {
74 c.subs[sid].md <- md
77 } else {
78 c.log("RDR_NOSUB", sid, md)
80 c.subsLock.RUnlock()
81 } else {
82 // RECEIPTs and ERRORs are never drained. They actually cannot
83 // be drained in any logical manner because they do not have a
84 // 'subscription' header.
85 c.input <- md
88 // TODO END
90 select {
91 case q = <-c.rsd:
92 default:
94 if q {
95 break
99 close(c.input)
100 c.log("RDR_SHUTDOWN", time.Now())
104 Physical frame reader.
106 This parses a single STOMP frame from data off of the wire, and
107 returns a Frame, with a possible error.
109 Note: this functionality could hang or exhibit other erroneous behavior
110 if running against a non-compliant STOMP server.
112 func (c *Connection) readFrame() (f Frame, e error) {
113 f = Frame{"", Headers{}, NULLBUFF}
114 // Read f.Command or line ends (maybe heartbeats)
115 for {
116 s, e := c.rdr.ReadString('\n')
117 if s == "" {
118 return f, e
120 if e != nil {
121 return f, e
123 if c.hbd != nil {
124 c.updateHBReads()
126 f.Command = s[0 : len(s)-1]
127 if s != "\n" {
128 break
130 // c.log("read slash n")
132 // Validate the command
133 if _, ok := validCmds[f.Command]; !ok {
134 return f, EINVBCMD
136 // Read f.Headers
137 for {
138 s, e := c.rdr.ReadString('\n')
139 if e != nil {
140 return f, e
142 if c.hbd != nil {
143 c.updateHBReads()
145 if s == "\n" {
146 break
148 s = s[0 : len(s)-1]
149 p := strings.SplitN(s, ":", 2)
150 if len(p) != 2 {
151 return f, EUNKHDR
153 if c.Protocol() != SPL_10 {
154 p[0] = decode(p[0])
155 p[1] = decode(p[1])
157 f.Headers = append(f.Headers, p[0], p[1])
160 e = checkHeaders(f.Headers, c.Protocol())
161 if e != nil {
162 return f, e
164 // Read f.Body
165 if v, ok := f.Headers.Contains(HK_CONTENT_LENGTH); ok {
166 l, e := strconv.Atoi(strings.TrimSpace(v))
167 if e != nil {
168 return f, e
170 if l == 0 {
171 f.Body, e = readUntilNul(c.rdr)
172 } else {
173 f.Body, e = readBody(c.rdr, l)
175 } else {
176 // content-length not present
177 f.Body, e = readUntilNul(c.rdr)
179 if e != nil {
180 return f, e
182 if c.hbd != nil {
183 c.updateHBReads()
186 return f, e
189 func (c *Connection) updateHBReads() {
190 c.hbd.rdl.Lock()
191 c.hbd.lr = time.Now().UnixNano() // Latest good read
192 c.hbd.rdl.Unlock()