2 // Copyright © 2011-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
27 type CONNERROR
struct {
32 func (e
*CONNERROR
) Error() string {
33 return e
.err
.Error() + ":" + e
.desc
37 Connection handler, one time use during initial connect.
39 Handle broker response, react to version incompatabilities, set up session,
40 and if necessary initialize heart beats.
42 func (c
*Connection
) connectHandler(h Headers
) (e error
) {
43 //fmt.Printf("CHDB01\n")
44 c
.rdr
= bufio
.NewReader(c
.netconn
)
45 b
, e
:= c
.rdr
.ReadBytes(0)
49 //fmt.Printf("CHDB02\n")
50 f
, e
:= connectResponse(string(b
))
54 //fmt.Printf("CHDB03\n")
56 c
.ConnectResponse
= &Message
{f
.Command
, f
.Headers
, f
.Body
}
57 if c
.ConnectResponse
.Command
== ERROR
{
58 return &CONNERROR
{ECONERR
, string(f
.Body
)}
60 //fmt.Printf("CHDB04\n")
62 e
= c
.setProtocolLevel(h
, c
.ConnectResponse
.Headers
)
66 //fmt.Printf("CHDB05\n")
68 if s
, ok
:= c
.ConnectResponse
.Headers
.Contains(HK_SESSION
); ok
{
74 if c
.Protocol() >= SPL_11
{
75 e
= c
.initializeHeartBeats(h
)
80 //fmt.Printf("CHDB06\n")
84 c
.mets
.tbr
+= c
.ConnectResponse
.Size(false)
89 Handle data from the wire after CONNECT is sent. Attempt to create a Frame
92 Called one time per connection at connection start.
94 func connectResponse(s
string) (*Frame
, error
) {
98 f
.Body
= make([]uint8, 0)
101 c
:= strings
.SplitN(s
, "\n", 2)
104 // fmt.Printf("lenc is: %d, data:%#v\n", len(c), c[0])
105 if bytes
.Compare(HandShake
, []byte(c
[0])) == 0 {
112 if f
.Command
!= CONNECTED
&& f
.Command
!= ERROR
{
117 case "\x00", "\n": // No headers, malformed bodies
118 f
.Body
= []uint8(c
[1])
120 case "\n\x00": // No headers, no body is OK
122 default: // Otherwise continue
125 b
:= strings
.SplitN(c
[1], "\n\n", 2)
126 if len(b
) == 1 { // No Headers, b[0] == body
128 f
.Body
= w
[0 : len(w
)-1]
129 if f
.Command
== CONNECTED
&& len(f
.Body
) > 0 {
136 // b[0] - the headers
140 for _
, l
:= range strings
.Split(b
[0], "\n") {
141 p
:= strings
.SplitN(l
, ":", 2)
143 f
.Body
= []uint8(p
[0]) // Bad feedback
146 f
.Headers
= append(f
.Headers
, p
[0], p
[1])
150 f
.Body
= w
[0 : len(w
)-1]
151 if f
.Command
== CONNECTED
&& len(f
.Body
) > 0 {
159 Check client version, one time use during initial connect.
161 func (c
*Connection
) checkClientVersions(h Headers
) (e error
) {
162 w
:= h
.Value(HK_ACCEPT_VERSION
)
163 if w
== "" { // Not present, client wants 1.0
166 v
:= strings
.SplitN(w
, ",", -1) //
168 for _
, sv
:= range v
{
169 if hasValue(supported
, sv
) {
170 ok
= true // At least we support one the client wants
176 if _
, ok
= h
.Contains(HK_HOST
); !ok
{
183 Set the protocol level for this new connection.
185 func (c
*Connection
) setProtocolLevel(ch
, sh Headers
) (e error
) {
186 chw
:= ch
.Value(HK_ACCEPT_VERSION
)
187 shr
:= sh
.Value(HK_VERSION
)
189 if chw
== shr
&& Supported(shr
) {
193 if chw
== "" && shr
== "" { // Straight up 1.0
194 return nil // protocol level defaults to SPL_10
196 cv
:= strings
.SplitN(chw
, ",", -1) // Client requested versions
198 if chw
!= "" && shr
!= "" {
199 if hasValue(cv
, shr
) {
201 return EBADVERSVR
// Client and server agree, but we do not support it
209 if chw
!= "" && shr
== "" { // Client asked for something, server is pure 1.0
210 if hasValue(cv
, SPL_10
) {
211 return nil // protocol level defaults to SPL_10
215 c
.protocol
= shr
// Could be anything we support
220 Internal function, used only during CONNECT processing.
222 func hasValue(a
[]string, w
string) bool {
223 for _
, v
:= range a
{