Adjust per latest 'publish' changes.
[stompngo_examples.git] / adhoc / varmGetter / noPackMod / noPMod1 / noPMod1.go
blobaf8d4c08b79f8fa8e033fe0da7ba617ad4f438f9
1 //
2 // Copyright © 2016-2018 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 Function: read the code :-).
22 package main
24 import (
25 "fmt"
26 "log"
27 "os"
28 "time"
30 "github.com/gmallard/stompngo"
31 "github.com/gmallard/stompngo/senv"
32 "github.com/gmallard/stompngo_examples/sngecomm"
35 var (
36 exampid = "varmGetter: "
37 ll = log.New(os.Stdout, "VRMGSC ", log.Ldate|log.Lmicroseconds)
38 tag = "vrmgmain"
39 unsub = true
40 dodisc = true
41 ar = false // Want ACK RECEIPT
42 session = ""
43 qcb []chan bool
44 wlp = "publish: message: "
47 func init() {
48 if os.Getenv("VMG_NOUNSUB") != "" {
49 unsub = false
51 if os.Getenv("VMG_NODISC") != "" {
52 dodisc = false
54 if os.Getenv("VMG_GETAR") != "" {
55 ar = true
59 // Connect to a STOMP broker, subscribe and receive some messages and disconnect.
60 func main() {
62 st := time.Now()
64 // Standard example connect sequence
65 n, conn, e := sngecomm.CommonConnect(exampid, tag, ll)
66 if e != nil {
67 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
68 exampid, tag, sngecomm.Lcs,
69 e.Error()) // Handle this ......
71 session = conn.Session()
72 //******************
73 nqs := sngecomm.Nqs()
74 for qn := 1; qn <= nqs; qn++ {
75 runNextQueue(qn, conn)
77 //******************
79 // Standard example disconnect sequence
80 if dodisc {
81 e = sngecomm.CommonDisconnect(n, conn, exampid, tag, ll)
82 if e != nil {
83 ll.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
84 exampid, tag, session,
85 e.Error()) // Handle this ......
87 ll.Printf("%stag:%s connsess:%s disconnect_receipt:%v\n",
88 exampid, tag, session,
89 conn.DisconnectReceipt)
90 } else {
91 ll.Printf("%stag:%s connsess:%s skipping_disconnect\n",
92 exampid, tag, session)
95 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
96 exampid, tag, session,
97 time.Now().Sub(st))
100 func runNextQueue(qn int, conn *stompngo.Connection) {
102 qns := fmt.Sprintf("%d", qn) // string number of the queue
103 conn.SetLogger(ll) // stompngo logging
104 pbc := sngecomm.Pbc() // Print byte count
105 d := senv.Dest() + qns + ".1" // Destination
106 id := stompngo.Uuid() // A unique name/id
107 nmsgs := qn // int number of messages to get, same as queue number
108 am := sngecomm.AckMode() // ACK mode to use on SUBSCRIBE
109 nfa := true // Need "final" ACK (possiby reset below)
110 wh := stompngo.Headers{} // Starting SUBSCRIBE headers
112 // Sanity check ACK mode
113 if conn.Protocol() == stompngo.SPL_10 &&
114 am == stompngo.AckModeClientIndividual {
115 ll.Fatalf("%stag:%s connsess:%s invalid_ack_mode am:%v proto:%v\n",
116 exampid, tag, session,
117 am, conn.Protocol()) //
119 // Do not do final ACK if running ACKs are issued
120 if am == stompngo.AckModeClientIndividual ||
121 am == stompngo.AckModeAuto {
122 nfa = false
125 // Show run parameters
126 ll.Printf("%stag:%s connsess:%s run_parms\n\tqns:%v\n\tpbc:%v\n\td:%v\n\tid:%v\n\tnmsgs:%v\n\tam:%v\n\tnfa:%v\n\twh:%v\n",
127 exampid, tag, session,
128 qns, pbc, d, id, nmsgs, am, nfa, wh)
130 // Run SUBSCRIBE
131 sc := doSubscribe(conn, d, id, am, wh)
132 ll.Printf("%stag:%s connsess:%s stomp_subscribe_complete\n",
133 exampid, tag, session)
135 var md stompngo.MessageData // Message data from basic read
136 var lmd stompngo.MessageData // Possible save (copy) of received data
137 mc := 1 // Initial message number
139 // Loop for the requested number of messages
140 GetLoop:
141 for {
142 ll.Printf("%stag:%s connsess:%s start_of_read_loop mc:%v nmsgs:%v\n",
143 exampid, tag, session, mc, nmsgs)
145 mcs := fmt.Sprintf("%d", mc) // string number message count
147 // Get something from the stompngo read routine
148 select {
149 case md = <-sc:
150 case md = <-conn.MessageData:
152 if md.Message.Command == stompngo.RECEIPT {
153 ll.Printf("%stag:%s connsess:%s have_receipt md:%v\n",
154 exampid, tag, session,
156 continue GetLoop
158 ll.Fatalf("%stag:%s connsess:%s ERROR_frame hdrs:%v body:%v\n",
159 exampid, tag, session,
160 md.Message.Headers, string(md.Message.Body)) // Handle this ......
163 // Save message data for possible use in the final ACK
164 if mc == nmsgs && nfa {
165 lmd = md // Save last message
168 // Basic loop logging
169 ll.Printf("%stag:%s connsess:%s channel_read_complete qn:%d mc:%d\n",
170 exampid, tag, session,
171 qn, mc)
172 ll.Printf("%stag:%s connsess:%s message_number:%v\n",
173 exampid, tag, session,
176 // Check if reader returned any error
177 if md.Error != nil {
178 ll.Fatalf("%stag:%s connsess:%s error_read error:%v",
179 exampid, tag, session,
180 md.Error) // Handle this ......
183 // Show frame type
184 ll.Printf("%stag:%s connsess:%s frame_type cmd:%s\n",
185 exampid, tag, session,
186 md.Message.Command)
188 // Pure sanity check: this should *never* happen based on logic
189 // above.
190 if md.Message.Command != stompngo.MESSAGE {
191 ll.Fatalf("%stag:%s connsess:%s error_frame_type md:%v",
192 exampid, tag, session,
193 md) // Handle this ......
196 // Show Message Headers
197 wh := md.Message.Headers
198 for j := 0; j < len(wh)-1; j += 2 {
199 ll.Printf("%stag:%s connsess:%s Header:%s:%s\n",
200 exampid, tag, session,
201 wh[j], wh[j+1])
203 // Show (part of) Message Body
204 if pbc > 0 {
205 maxlen := pbc
206 if len(md.Message.Body) < maxlen {
207 maxlen = len(md.Message.Body)
209 ss := string(md.Message.Body[0:maxlen])
210 ll.Printf("%stag:%s connsess:%s payload body:%s\n",
211 exampid, tag, session,
215 // Sanity check this message payload
217 wm := wlp + mcs // The left part plus the (string) meassage number]
218 bm := string(md.Message.Body)
219 if bm != wm {
220 ll.Fatalf("%stag:%s connsess:%s error_message_payload\n\tGot %s\n\tWant%s\n",
221 exampid, tag, session,
222 bm, wm) // Handle this ......
223 } else {
224 ll.Printf("%stag:%s connsess:%s matched_body_string\n%s\n%s\n",
225 exampid, tag, session,
226 bm, wm) // Handle this ......)
230 // Run individual ACK if required
231 if am == stompngo.AckModeClientIndividual {
232 wh := md.Message.Headers // Copy Headers
233 if ar { // ACK receipt wanted
234 wh = wh.Add(stompngo.HK_RECEIPT, "rwanted-"+mcs)
236 sngecomm.HandleAck(conn, wh, id)
237 ll.Printf("%stag:%s connsess:%s individual_ack_complete mc:%v headers:%v\n",
238 exampid, tag, session,
239 mc, md.Message.Headers)
243 // Check for end of loop condition
244 if mc == nmsgs {
245 break
248 // Increment loop/message counter
249 mc++
252 qc := make(chan bool)
253 go drainSub(session, sc, qc, qn)
254 dd := false
256 // Issue the final ACK if needed
257 if nfa {
258 wh := lmd.Message.Headers // Copy Headers
259 if ar { // ACK receipt wanted
260 wh = wh.Add(stompngo.HK_RECEIPT, "rwanted-fin")
262 sngecomm.HandleAck(conn, wh, id)
263 ll.Printf("%stag:%s connsess:%s final_ack_complete\n",
264 exampid, tag, session)
265 if ar {
266 ll.Printf("%stag:%s connsess:%s ack_receive_wait_for_drain qn:%d\n",
267 exampid, tag, session,
269 <-qc // Wait for drainSub
270 ll.Printf("%stag:%s connsess:%s ack_receive_drain_complete qn:%d\n",
271 exampid, tag, session,
273 dd = true
274 getReceipt(conn)
278 if !dd {
279 ll.Printf("%stag:%s connsess:%s message_loop_wait_for_drain qn:%d\n",
280 exampid, tag, session,
282 <-qc
283 ll.Printf("%stag:%s connsess:%s message_loop_drain_complete qn:%d\n",
284 exampid, tag, session,
288 // Unsubscribe (may be skipped if requested)
289 if unsub {
290 sngecomm.HandleUnsubscribe(conn, d, id)
291 ll.Printf("%stag:%s connsess:%s stomp_unsubscribe_complete\n",
292 exampid, tag, session)
293 } else {
294 ll.Printf("%stag:%s connsess:%s skipping_unsubscribe\n",
295 exampid, tag, session)
299 // Handle a subscribe for the different protocol levels.
300 func doSubscribe(c *stompngo.Connection, d, id, a string, h stompngo.Headers) <-chan stompngo.MessageData {
301 h = h.Add("destination", d).Add("ack", a)
303 switch c.Protocol() {
304 case stompngo.SPL_12:
305 // Add required id header
306 h = h.Add("id", id)
307 case stompngo.SPL_11:
308 // Add required id header
309 h = h.Add("id", id)
310 case stompngo.SPL_10:
311 // Nothing else to do here
312 default:
313 ll.Fatalf("v1:%v\n", "subscribe invalid protocol level, should not happen")
316 r, e := c.Subscribe(h)
317 if e != nil {
318 ll.Fatalf("subscribe failed err:[%v]\n", e)
320 return r
323 // Get receipt
324 func getReceipt(conn *stompngo.Connection) {
325 rd := <-conn.MessageData
326 ll.Printf("%stag:%s connsess:%s have_receipt_sub md:%v\n",
327 exampid, tag, session,
331 // Drain a subscription
332 func drainSub(s string, sc <-chan stompngo.MessageData, dc chan bool, qn int) {
333 ll.Printf("%stag:%s connsess:%s drain_starts qn:%d\n",
334 exampid, tag, s,
336 tmr := time.NewTimer(1 * time.Second) // Time to wait, YMMV
337 q := false
338 for {
339 select {
340 case md := <-sc:
341 ll.Printf("%stag:%s connsess:%s drained md:%v\n",
342 exampid, tag, s,
344 case _ = <-tmr.C:
345 q = true
347 if q {
348 dc <- true
349 break
352 ll.Printf("%stag:%s connsess:%s drain_ends qn:%d\n",
353 exampid, tag, s,