2 // Copyright © 2011-2019 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
25 // Exported Connection methods
28 Connected returns the current connection status.
30 func (c
*Connection
) Connected() bool {
35 Session returns the broker assigned session id.
37 func (c
*Connection
) Session() string {
42 Protocol returns the current connection protocol level.
44 func (c
*Connection
) Protocol() string {
46 defer c
.protoLock
.Unlock()
51 SetLogger enables a client defined logger for this connection.
53 Set to "nil" to disable logging.
57 l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
60 func (c
*Connection
) SetLogger(l
*log
.Logger
) {
67 GetLogger - returns the current connection logger.
69 func (c
*Connection
) GetLogger() *log
.Logger
{
74 SendTickerInterval returns any heartbeat send ticker interval in ms. A return
75 value of zero means no heartbeats are being sent.
77 func (c
*Connection
) SendTickerInterval() int64 {
81 return c
.hbd
.sti
/ 1000000
85 ReceiveTickerInterval returns any heartbeat receive ticker interval in ms.
86 A return value of zero means no heartbeats are being received.
88 func (c
*Connection
) ReceiveTickerInterval() int64 {
92 return c
.hbd
.rti
/ 1000000
96 SendTickerCount returns any heartbeat send ticker count. A return value of
97 zero usually indicates no send heartbeats are enabled.
99 func (c
*Connection
) SendTickerCount() int64 {
107 ReceiveTickerCount returns any heartbeat receive ticker count. A return
108 value of zero usually indicates no read heartbeats are enabled.
110 func (c
*Connection
) ReceiveTickerCount() int64 {
118 FramesRead returns a count of the number of frames read on the connection.
120 func (c
*Connection
) FramesRead() int64 {
125 BytesRead returns a count of the number of bytes read on the connection.
127 func (c
*Connection
) BytesRead() int64 {
132 FramesWritten returns a count of the number of frames written on the connection.
134 func (c
*Connection
) FramesWritten() int64 {
139 BytesWritten returns a count of the number of bytes written on the connection.
141 func (c
*Connection
) BytesWritten() int64 {
146 Running returns a time duration since connection start.
148 func (c
*Connection
) Running() time
.Duration
{
149 return time
.Since(c
.mets
.st
)
153 SubChanCap returns the current subscribe channel capacity.
155 func (c
*Connection
) SubChanCap() int {
160 SetSubChanCap sets a new subscribe channel capacity, to be used during future
161 SUBSCRIBE operations.
163 func (c
*Connection
) SetSubChanCap(nc
int) {
168 // Unexported Connection methods
171 Log data if possible.
173 func (c
*Connection
) log(v
...interface{}) {
175 defer logLock
.Unlock()
179 _
, fn
, ld
, ok
:= runtime
.Caller(1)
182 c
.logger
.Printf("%s %s %d %v\n", c
.session
, fn
, ld
, v
)
184 c
.logger
.Printf("%s %v\n", c
.session
, v
)
190 Log data if possible (extended / abbreviated) logic).
192 func (c
*Connection
) logx(v
...interface{}) {
193 _
, fn
, ld
, ok
:= runtime
.Caller(1)
197 c
.logger
.Printf("%s %s %d %v\n", c
.session
, fn
, ld
, v
)
199 c
.logger
.Printf("%s %v\n", c
.session
, v
)
208 func (c
*Connection
) shutdownHeartBeats() {
209 // Shutdown heartbeats if necessary
228 func (c
*Connection
) shutdown() {
229 c
.log("SHUTDOWN", "starts")
230 c
.shutdownHeartBeats()
231 // Close all individual subscribe channels
232 // This is a write lock
234 for key
:= range c
.subs
{
235 close(c
.subs
[key
].md
)
236 c
.subs
[key
].cs
= true
242 c
.log("SHUTDOWN", "ends")
247 Connection Abort logic. Shutdown connection system on problem happens
249 func (c
*Connection
) sysAbort() {
250 c
.abortOnce
.Do(func() { close(c
.ssdc
) })
257 func (c
*Connection
) handleReadError(md MessageData
) {
258 c
.log("HDRERR", "starts", md
)
259 c
.shutdownHeartBeats() // We are done here
260 // Notify any general subscriber of error
265 // Notify all individual subscribers of error
266 // This is a read lock
269 for key
:= range c
.subs
{
274 // Try to catch the writer
276 c
.log("HDRERR", "ends")
277 // Let further shutdown logic proceed normally.