A correction to the previous commit.
[stompngo.git] / heartbeats.go
blob782ac54adb73644d05773defed47b08a62b1ef6c
1 //
2 // Copyright © 2011-2019 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "fmt"
21 "strconv"
22 "strings"
23 "time"
27 Initialize heart beats if necessary and possible.
29 Return an error, possibly nil, to mainline if initialization can not
30 complete. Establish heartbeat send and receive goroutines as necessary.
32 func (c *Connection) initializeHeartBeats(ch Headers) (e error) {
33 // Client wants Heartbeats ?
34 vc, ok := ch.Contains(HK_HEART_BEAT)
35 if !ok || vc == "0,0" {
36 return nil
38 // Server wants Heartbeats ?
39 vs, ok := c.ConnectResponse.Headers.Contains(HK_HEART_BEAT)
40 if !ok || vs == "0,0" {
41 return nil
43 // Work area, may or may not become connection heartbeat data
44 w := &heartBeatData{cx: 0, cy: 0, sx: 0, sy: 0,
45 hbs: true, hbr: true, // possible reset later
46 sti: 0, rti: 0,
47 ls: 0, lr: 0}
49 // Client specified values
50 cp := strings.Split(vc, ",")
51 if len(cp) != 2 { // S/B caught by the server first
52 return Error("invalid client heart-beat header: " + vc)
54 w.cx, e = strconv.ParseInt(cp[0], 10, 64)
55 if e != nil {
56 return Error("non-numeric cx heartbeat value: " + cp[0])
58 w.cy, e = strconv.ParseInt(cp[1], 10, 64)
59 if e != nil {
60 return Error("non-numeric cy heartbeat value: " + cp[1])
63 // Server specified values
64 sp := strings.Split(vs, ",")
65 if len(sp) != 2 {
66 return Error("invalid server heart-beat header: " + vs)
68 w.sx, e = strconv.ParseInt(sp[0], 10, 64)
69 if e != nil {
70 return Error("non-numeric sx heartbeat value: " + sp[0])
72 w.sy, e = strconv.ParseInt(sp[1], 10, 64)
73 if e != nil {
74 return Error("non-numeric sy heartbeat value: " + sp[1])
77 // Check for sending needed
78 if w.cx == 0 || w.sy == 0 {
79 w.hbs = false //
82 // Check for receiving needed
83 if w.sx == 0 || w.cy == 0 {
84 w.hbr = false //
87 // ========================================================================
89 if !w.hbs && !w.hbr {
90 return nil // none required
93 // ========================================================================
95 c.hbd = w // OK, we are doing some kind of heartbeating
96 ct := time.Now().UnixNano() // Prime current time
98 if w.hbs { // Finish sender parameters if required
99 sm := max(w.cx, w.sy) // ticker interval, ms
100 w.sti = 1000000 * sm // ticker interval, ns
101 w.ssd = make(chan struct{}) // add shutdown channel
102 w.ls = ct // Best guess at start
103 // fmt.Println("start send ticker")
104 go c.sendTicker()
107 if w.hbr { // Finish receiver parameters if required
108 rm := max(w.sx, w.cy) // ticker interval, ms
109 w.rti = 1000000 * rm // ticker interval, ns
110 w.rsd = make(chan struct{}) // add shutdown channel
111 w.lr = ct // Best guess at start
112 // fmt.Println("start receive ticker")
113 go c.receiveTicker()
115 return nil
119 The heart beat send ticker.
121 func (c *Connection) sendTicker() {
122 c.hbd.sc = 0
123 ticker := time.NewTicker(time.Duration(c.hbd.sti))
124 defer ticker.Stop()
125 hbSend:
126 for {
127 select {
128 case <-ticker.C:
129 c.log("HeartBeat Send data")
130 // Send a heartbeat
131 f := Frame{"\n", Headers{}, NULLBUFF} // Heartbeat frame
132 r := make(chan error)
133 if e := c.writeWireData(wiredata{f, r}); e != nil {
134 c.Hbsf = true
135 break hbSend
137 e := <-r
139 c.hbd.sdl.Lock()
140 if e != nil {
141 fmt.Printf("Heartbeat Send Failure: %v\n", e)
142 c.Hbsf = true
143 } else {
144 c.Hbsf = false
145 c.hbd.sc++
147 c.hbd.sdl.Unlock()
149 case _ = <-c.hbd.ssd:
150 break hbSend
151 case _ = <-c.ssdc:
152 break hbSend
153 } // End of select
154 } // End of for
155 c.log("Heartbeat Send Ends", time.Now())
156 return
160 The heart beat receive ticker.
162 func (c *Connection) receiveTicker() {
163 c.hbd.rc = 0
164 var first, last, nd int64
165 hbGet:
166 for {
167 nd = c.hbd.rti - (last - first)
168 // Check if receives are supposed to be "fast" *and* we spent a
169 // lot of time in the previous loop.
170 if nd <= 0 {
171 nd = c.hbd.rti
173 ticker := time.NewTicker(time.Duration(nd))
174 select {
175 case ct := <-ticker.C:
176 first = time.Now().UnixNano()
177 ticker.Stop()
178 c.hbd.rdl.Lock()
179 flr := c.hbd.lr
180 ld := ct.UnixNano() - flr
181 c.log("HeartBeat Receive TIC", "TickerVal", ct.UnixNano(),
182 "LastReceive", flr, "Diff", ld)
183 if ld > (c.hbd.rti + (c.hbd.rti / 5)) { // swag plus to be tolerant
184 c.log("HeartBeat Receive Read is dirty")
185 c.Hbrf = true // Flag possible dirty connection
186 } else {
187 c.Hbrf = false // Reset
188 c.hbd.rc++
190 c.hbd.rdl.Unlock()
191 last = time.Now().UnixNano()
192 case _ = <-c.hbd.rsd:
193 ticker.Stop()
194 break hbGet
195 case _ = <-c.ssdc:
196 ticker.Stop()
197 break hbGet
198 } // End of select
199 } // End of for
200 c.log("Heartbeat Receive Ends", time.Now())
201 return