Address issue #50.
[stompngo.git] / connection.go
blobfe4b7c5fad8818bf4e05bce3405c704a338d5148
1 //
2 // Copyright © 2011-2019 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "log"
21 "runtime"
22 "time"
25 // Exported Connection methods
28 Connected returns the current connection status.
30 func (c *Connection) Connected() bool {
31 return c.connected
35 Session returns the broker assigned session id.
37 func (c *Connection) Session() string {
38 return c.session
42 Protocol returns the current connection protocol level.
44 func (c *Connection) Protocol() string {
45 c.protoLock.Lock()
46 defer c.protoLock.Unlock()
47 return c.protocol
51 SetLogger enables a client defined logger for this connection.
53 Set to "nil" to disable logging.
55 Example:
56 // Start logging
57 l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
58 c.SetLogger(l)
60 func (c *Connection) SetLogger(l *log.Logger) {
61 logLock.Lock()
62 c.logger = l
63 logLock.Unlock()
67 GetLogger - returns the current connection logger.
69 func (c *Connection) GetLogger() *log.Logger {
70 return c.logger
74 SendTickerInterval returns any heartbeat send ticker interval in ms. A return
75 value of zero means no heartbeats are being sent.
77 func (c *Connection) SendTickerInterval() int64 {
78 if c.hbd == nil {
79 return 0
81 return c.hbd.sti / 1000000
85 ReceiveTickerInterval returns any heartbeat receive ticker interval in ms.
86 A return value of zero means no heartbeats are being received.
88 func (c *Connection) ReceiveTickerInterval() int64 {
89 if c.hbd == nil {
90 return 0
92 return c.hbd.rti / 1000000
96 SendTickerCount returns any heartbeat send ticker count. A return value of
97 zero usually indicates no send heartbeats are enabled.
99 func (c *Connection) SendTickerCount() int64 {
100 if c.hbd == nil {
101 return 0
103 return c.hbd.sc
107 ReceiveTickerCount returns any heartbeat receive ticker count. A return
108 value of zero usually indicates no read heartbeats are enabled.
110 func (c *Connection) ReceiveTickerCount() int64 {
111 if c.hbd == nil {
112 return 0
114 return c.hbd.rc
118 FramesRead returns a count of the number of frames read on the connection.
120 func (c *Connection) FramesRead() int64 {
121 return c.mets.tfr
125 BytesRead returns a count of the number of bytes read on the connection.
127 func (c *Connection) BytesRead() int64 {
128 return c.mets.tbr
132 FramesWritten returns a count of the number of frames written on the connection.
134 func (c *Connection) FramesWritten() int64 {
135 return c.mets.tfw
139 BytesWritten returns a count of the number of bytes written on the connection.
141 func (c *Connection) BytesWritten() int64 {
142 return c.mets.tbw
146 Running returns a time duration since connection start.
148 func (c *Connection) Running() time.Duration {
149 return time.Since(c.mets.st)
153 SubChanCap returns the current subscribe channel capacity.
155 func (c *Connection) SubChanCap() int {
156 return c.scc
160 SetSubChanCap sets a new subscribe channel capacity, to be used during future
161 SUBSCRIBE operations.
163 func (c *Connection) SetSubChanCap(nc int) {
164 c.scc = nc
165 return
168 // Unexported Connection methods
171 Log data if possible.
173 func (c *Connection) log(v ...interface{}) {
174 logLock.Lock()
175 defer logLock.Unlock()
176 if c.logger == nil {
177 return
179 _, fn, ld, ok := runtime.Caller(1)
181 if ok {
182 c.logger.Printf("%s %s %d %v\n", c.session, fn, ld, v)
183 } else {
184 c.logger.Printf("%s %v\n", c.session, v)
186 return
190 Log data if possible (extended / abbreviated) logic).
192 func (c *Connection) logx(v ...interface{}) {
193 _, fn, ld, ok := runtime.Caller(1)
195 c.sessLock.Lock()
196 if ok {
197 c.logger.Printf("%s %s %d %v\n", c.session, fn, ld, v)
198 } else {
199 c.logger.Printf("%s %v\n", c.session, v)
201 c.sessLock.Unlock()
202 return
206 Shutdown heartbeats
208 func (c *Connection) shutdownHeartBeats() {
209 // Shutdown heartbeats if necessary
210 if c.hbd != nil {
211 c.hbd.clk.Lock()
212 if !c.hbd.ssdn {
213 if c.hbd.hbs {
214 close(c.hbd.ssd)
216 if c.hbd.hbr {
217 close(c.hbd.rsd)
219 c.hbd.ssdn = true
221 c.hbd.clk.Unlock()
226 Shutdown logic.
228 func (c *Connection) shutdown() {
229 c.log("SHUTDOWN", "starts")
230 c.shutdownHeartBeats()
231 // Close all individual subscribe channels
232 // This is a write lock
233 c.subsLock.Lock()
234 for key := range c.subs {
235 close(c.subs[key].md)
236 c.subs[key].cs = true
238 c.connLock.Lock()
239 c.connected = false
240 c.connLock.Unlock()
241 c.subsLock.Unlock()
242 c.log("SHUTDOWN", "ends")
243 return
247 Connection Abort logic. Shutdown connection system on problem happens
249 func (c *Connection) sysAbort() {
250 c.abortOnce.Do(func() { close(c.ssdc) })
251 return
255 Read error handler.
257 func (c *Connection) handleReadError(md MessageData) {
258 c.log("HDRERR", "starts", md)
259 c.shutdownHeartBeats() // We are done here
260 // Notify any general subscriber of error
261 select {
262 case c.input <- md:
263 default:
265 // Notify all individual subscribers of error
266 // This is a read lock
267 c.subsLock.RLock()
268 if c.connected {
269 for key := range c.subs {
270 c.subs[key].md <- md
273 c.subsLock.RUnlock()
274 // Try to catch the writer
275 close(c.wtrsdc)
276 c.log("HDRERR", "ends")
277 // Let further shutdown logic proceed normally.
278 return