Cosmetics in code, no functionality change.
[stompngo_examples.git] / adhoc / varmGetter / varmGetter.go
bloba10a402c19085b4cf9f66de51c192df3ea2a1a35
1 //
2 // Copyright © 2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 Function: read the code :-).
22 package main
24 import (
25 "fmt"
26 "log"
27 "os"
28 "time"
30 "github.com/gmallard/stompngo"
31 "github.com/gmallard/stompngo/senv"
32 "github.com/gmallard/stompngo_examples/sngecomm"
35 var (
36 exampid = "varmGetter: "
37 ll = log.New(os.Stdout, "VRMG ", log.Ldate|log.Lmicroseconds)
38 tag = "vrmgmain"
39 unsub = true
40 dodisc = true
41 ar = false // Want ACK RECEIPT
44 func init() {
45 if os.Getenv("VMG_NOUNSUB") != "" {
46 unsub = false
48 if os.Getenv("VMG_NODISC") != "" {
49 dodisc = false
51 if os.Getenv("VMG_GETAR") != "" {
52 ar = true
56 // Connect to a STOMP broker, subscribe and receive some messages and disconnect.
57 func main() {
59 st := time.Now()
61 // Standard example connect sequence
62 n, conn, e := sngecomm.CommonConnect(exampid, tag, ll)
63 if e != nil {
64 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
65 exampid, tag, sngecomm.Lcs,
66 e.Error()) // Handle this ......
69 conn.SetLogger(ll) // stompngo logging
70 pbc := sngecomm.Pbc() // Print byte count
71 d := senv.Dest() // Destination
72 id := stompngo.Uuid() // A unique name/id
73 nmsgs := senv.Nmsgs() // int number of messages to get
74 mns := fmt.Sprintf("%d", nmsgs) // string number of messages to get
75 am := sngecomm.AckMode() // ACK mode to use on SUBSCRIBE
76 nfa := true // Need "final" ACK (possiby reset below)
77 wh := stompngo.Headers{ // Starting SUBSCRIBE headers
78 stompngo.StompPlusDrainAfter,
79 mns} // Need a string here
81 // Sanity check ACK mode
82 if conn.Protocol() == stompngo.SPL_10 &&
83 am == stompngo.AckModeClientIndividual {
84 ll.Fatalf("%stag:%s connsess:%s invalid_ack_mode am:%v proto:%v\n",
85 exampid, tag, conn.Session(),
86 am, conn.Protocol()) //
88 // Do not do final ACK if running ACKs are issued
89 if am == stompngo.AckModeClientIndividual ||
90 am == stompngo.AckModeAuto {
91 nfa = false
94 // Show run parameters
95 ll.Printf("%stag:%s connsess:%s run_parms\n\tpbc:%v\n\td:%v\n\tid:%v\n\tnmsgs:%v\n\tam:%v\n\tnfa:%v\n\twh:%v\n",
96 exampid, tag, conn.Session(),
97 pbc, d, id, nmsgs, am, nfa, wh)
99 // Run SUBSCRIBE
100 sc := doSubscribe(conn, d, id, am, wh)
101 ll.Printf("%stag:%s connsess:%s stomp_subscribe_complete\n",
102 exampid, tag, conn.Session())
104 var md stompngo.MessageData // Message data from basic read
105 var lmd stompngo.MessageData // Possible save (copy) of received data
106 mc := 1 // Initial message number
108 // Loop for the requested number of messages
109 GetLoop:
110 for {
111 ll.Printf("%stag:%s connsess:%s start_of_read_loop mc:%v nmsgs:%v\n",
112 exampid, tag, conn.Session(), mc, nmsgs)
114 mcs := fmt.Sprintf("%d", mc) // string number message count
116 // Get something from the stompngo read routine
117 select {
118 case md = <-sc:
119 case md = <-conn.MessageData:
121 if md.Message.Command == stompngo.RECEIPT {
122 ll.Printf("%stag:%s connsess:%s have_receipt md:%v\n",
123 exampid, tag, conn.Session(),
125 continue GetLoop
127 ll.Fatalf("%stag:%s connsess:%s ERROR_frame hdrs:%v body:%v\n",
128 exampid, tag, conn.Session(),
129 md.Message.Headers, string(md.Message.Body)) // Handle this ......
132 // Save message data for possible use in the final ACK
133 if mc == nmsgs && nfa {
134 lmd = md // Save last message
137 // Basic loop logging
138 ll.Printf("%stag:%s connsess:%s channel_read_complete\n",
139 exampid, tag, conn.Session())
140 ll.Printf("%stag:%s connsess:%s message_number:%v\n",
141 exampid, tag, conn.Session(),
144 // Check if reader returned any error
145 if md.Error != nil {
146 ll.Fatalf("%stag:%s connsess:%s error_read error:%v",
147 exampid, tag, conn.Session(),
148 md.Error) // Handle this ......
151 // Show frame type
152 ll.Printf("%stag:%s connsess:%s frame_type cmd:%s\n",
153 exampid, tag, conn.Session(),
154 md.Message.Command)
156 // Pure sanity check: this should *never* happen based on logic
157 // above.
158 if md.Message.Command != stompngo.MESSAGE {
159 ll.Fatalf("%stag:%s connsess:%s error_frame_type md:%v",
160 exampid, tag, conn.Session(),
161 md) // Handle this ......
164 // Show Message Headers
165 wh := md.Message.Headers
166 for j := 0; j < len(wh)-1; j += 2 {
167 ll.Printf("%stag:%s connsess:%s Header:%s:%s\n",
168 exampid, tag, conn.Session(),
169 wh[j], wh[j+1])
171 // Show (part of) Message Body
172 if pbc > 0 {
173 maxlen := pbc
174 if len(md.Message.Body) < maxlen {
175 maxlen = len(md.Message.Body)
177 ss := string(md.Message.Body[0:maxlen])
178 ll.Printf("%stag:%s connsess:%s payload body:%s\n",
179 exampid, tag, conn.Session(),
183 // Run individual ACK if required
184 if am == stompngo.AckModeClientIndividual {
185 wh := md.Message.Headers // Copy Headers
186 if ar { // ACK receipt wanted
187 wh = wh.Add(stompngo.HK_RECEIPT, "rwanted-"+mcs)
189 sngecomm.HandleAck(conn, wh, id)
190 ll.Printf("%stag:%s connsess:%s individual_ack_complete mc:%v headers:%v\n",
191 exampid, tag, conn.Session(),
192 mc, md.Message.Headers)
196 // Check for end of loop condition
197 if mc == nmsgs {
198 break
201 // Increment loop/message counter
202 mc++
205 // Issue the final ACK if needed
206 if nfa {
207 wh := lmd.Message.Headers // Copy Headers
208 if ar { // ACK receipt wanted
209 wh = wh.Add(stompngo.HK_RECEIPT, "rwanted-fin")
211 sngecomm.HandleAck(conn, wh, id)
212 ll.Printf("%stag:%s connsess:%s final_ack_complete\n",
213 exampid, tag, conn.Session())
214 if ar {
215 getReceipt(conn)
219 // Unsubscribe (may be skipped if requested)
220 if unsub {
221 sngecomm.HandleUnsubscribe(conn, d, id)
222 ll.Printf("%stag:%s connsess:%s stomp_unsubscribe_complete\n",
223 exampid, tag, conn.Session())
224 } else {
225 ll.Printf("%stag:%s connsess:%s skipping_unsubscribe\n",
226 exampid, tag, conn.Session())
229 // Standard example disconnect sequence (may be skipped if requested)
230 // a) Closes the *stompngo.Connection
231 // b) Closes the net.Conn
232 if dodisc {
233 e = sngecomm.CommonDisconnect(n, conn, exampid, tag, ll)
234 if e != nil {
235 ll.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
236 exampid, tag, conn.Session(),
237 e.Error()) // Handle this ......
239 ll.Printf("%stag:%s connsess:%s disconnect_receipt:%v\n",
240 exampid, tag, conn.Session(),
241 conn.DisconnectReceipt)
242 } else {
243 ll.Printf("%stag:%s connsess:%s skipping_disconnect\n",
244 exampid, tag, conn.Session())
247 // End of work logging, show elapsed time
248 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
249 exampid, tag, conn.Session(),
250 time.Now().Sub(st))
253 // Handle a subscribe for the different protocol levels.
254 func doSubscribe(c *stompngo.Connection, d, id, a string, h stompngo.Headers) <-chan stompngo.MessageData {
255 h = h.Add("destination", d).Add("ack", a)
257 switch c.Protocol() {
258 case stompngo.SPL_12:
259 // Add required id header
260 h = h.Add("id", id)
261 case stompngo.SPL_11:
262 // Add required id header
263 h = h.Add("id", id)
264 case stompngo.SPL_10:
265 // Nothing else to do here
266 default:
267 ll.Fatalf("v1:%v\n", "subscribe invalid protocol level, should not happen")
270 r, e := c.Subscribe(h)
271 if e != nil {
272 ll.Fatalf("subscribe failed err:[%v]\n", e)
274 return r
277 // Get receipt
278 func getReceipt(conn *stompngo.Connection) {
279 rd := <-conn.MessageData
280 ll.Printf("%stag:%s connsess:%s have_receipt_sub md:%v\n",
281 exampid, tag, conn.Session(),