2 // Copyright © 2016-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 Function: read the code :-).
30 "github.com/gmallard/stompngo"
31 "github.com/gmallard/stompngo/senv"
32 "github.com/gmallard/stompngo_examples/sngecomm"
36 exampid
= "varmGetter: "
37 ll
= log
.New(os
.Stdout
, "VRMGSC ", log
.Ldate|log
.Lmicroseconds
)
41 ar
= false // Want ACK RECEIPT
44 wlp
= "publish: message: "
48 if os
.Getenv("VMG_NOUNSUB") != "" {
51 if os
.Getenv("VMG_NODISC") != "" {
54 if os
.Getenv("VMG_GETAR") != "" {
59 // Connect to a STOMP broker, subscribe and receive some messages and disconnect.
64 // Standard example connect sequence
65 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, tag
, ll
)
67 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
68 exampid
, tag
, sngecomm
.Lcs
,
69 e
.Error()) // Handle this ......
71 session
= conn
.Session()
74 for qn
:= 1; qn
<= nqs
; qn
++ {
75 runNextQueue(qn
, conn
)
79 // Standard example disconnect sequence
81 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, tag
, ll
)
83 ll
.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
84 exampid
, tag
, session
,
85 e
.Error()) // Handle this ......
87 ll
.Printf("%stag:%s connsess:%s disconnect_receipt:%v\n",
88 exampid
, tag
, session
,
89 conn
.DisconnectReceipt
)
91 ll
.Printf("%stag:%s connsess:%s skipping_disconnect\n",
92 exampid
, tag
, session
)
95 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
96 exampid
, tag
, session
,
100 func runNextQueue(qn
int, conn
*stompngo
.Connection
) {
102 qns
:= fmt
.Sprintf("%d", qn
) // string number of the queue
103 conn
.SetLogger(ll
) // stompngo logging
104 pbc
:= sngecomm
.Pbc() // Print byte count
105 d
:= senv
.Dest() + qns
+ ".1" // Destination
106 id
:= stompngo
.Uuid() // A unique name/id
107 nmsgs
:= qn
// int number of messages to get, same as queue number
108 am
:= sngecomm
.AckMode() // ACK mode to use on SUBSCRIBE
109 nfa
:= true // Need "final" ACK (possiby reset below)
110 wh
:= stompngo
.Headers
{} // Starting SUBSCRIBE headers
112 // Sanity check ACK mode
113 if conn
.Protocol() == stompngo
.SPL_10
&&
114 am
== stompngo
.AckModeClientIndividual
{
115 ll
.Fatalf("%stag:%s connsess:%s invalid_ack_mode am:%v proto:%v\n",
116 exampid
, tag
, session
,
117 am
, conn
.Protocol()) //
119 // Do not do final ACK if running ACKs are issued
120 if am
== stompngo
.AckModeClientIndividual ||
121 am
== stompngo
.AckModeAuto
{
125 // Show run parameters
126 ll
.Printf("%stag:%s connsess:%s run_parms\n\tqns:%v\n\tpbc:%v\n\td:%v\n\tid:%v\n\tnmsgs:%v\n\tam:%v\n\tnfa:%v\n\twh:%v\n",
127 exampid
, tag
, session
,
128 qns
, pbc
, d
, id
, nmsgs
, am
, nfa
, wh
)
131 sc
:= doSubscribe(conn
, d
, id
, am
, wh
)
132 ll
.Printf("%stag:%s connsess:%s stomp_subscribe_complete\n",
133 exampid
, tag
, session
)
135 var md stompngo
.MessageData
// Message data from basic read
136 var lmd stompngo
.MessageData
// Possible save (copy) of received data
137 mc
:= 1 // Initial message number
139 // Loop for the requested number of messages
142 ll
.Printf("%stag:%s connsess:%s start_of_read_loop mc:%v nmsgs:%v\n",
143 exampid
, tag
, session
, mc
, nmsgs
)
145 mcs
:= fmt
.Sprintf("%d", mc
) // string number message count
147 // Get something from the stompngo read routine
150 case md
= <-conn
.MessageData
:
152 if md
.Message
.Command
== stompngo
.RECEIPT
{
153 ll
.Printf("%stag:%s connsess:%s have_receipt md:%v\n",
154 exampid
, tag
, session
,
158 ll
.Fatalf("%stag:%s connsess:%s ERROR_frame hdrs:%v body:%v\n",
159 exampid
, tag
, session
,
160 md
.Message
.Headers
, string(md
.Message
.Body
)) // Handle this ......
163 // Save message data for possible use in the final ACK
164 if mc
== nmsgs
&& nfa
{
165 lmd
= md
// Save last message
168 // Basic loop logging
169 ll
.Printf("%stag:%s connsess:%s channel_read_complete qn:%d mc:%d\n",
170 exampid
, tag
, session
,
172 ll
.Printf("%stag:%s connsess:%s message_number:%v\n",
173 exampid
, tag
, session
,
176 // Check if reader returned any error
178 ll
.Fatalf("%stag:%s connsess:%s error_read error:%v",
179 exampid
, tag
, session
,
180 md
.Error
) // Handle this ......
184 ll
.Printf("%stag:%s connsess:%s frame_type cmd:%s\n",
185 exampid
, tag
, session
,
188 // Pure sanity check: this should *never* happen based on logic
190 if md
.Message
.Command
!= stompngo
.MESSAGE
{
191 ll
.Fatalf("%stag:%s connsess:%s error_frame_type md:%v",
192 exampid
, tag
, session
,
193 md
) // Handle this ......
196 // Show Message Headers
197 wh
:= md
.Message
.Headers
198 for j
:= 0; j
< len(wh
)-1; j
+= 2 {
199 ll
.Printf("%stag:%s connsess:%s Header:%s:%s\n",
200 exampid
, tag
, session
,
203 // Show (part of) Message Body
206 if len(md
.Message
.Body
) < maxlen
{
207 maxlen
= len(md
.Message
.Body
)
209 ss
:= string(md
.Message
.Body
[0:maxlen
])
210 ll
.Printf("%stag:%s connsess:%s payload body:%s\n",
211 exampid
, tag
, session
,
215 // Sanity check this message payload
217 wm := wlp + mcs // The left part plus the (string) meassage number]
218 bm := string(md.Message.Body)
220 ll.Fatalf("%stag:%s connsess:%s error_message_payload\n\tGot %s\n\tWant%s\n",
221 exampid, tag, session,
222 bm, wm) // Handle this ......
224 ll.Printf("%stag:%s connsess:%s matched_body_string\n%s\n%s\n",
225 exampid, tag, session,
226 bm, wm) // Handle this ......)
230 // Run individual ACK if required
231 if am
== stompngo
.AckModeClientIndividual
{
232 wh
:= md
.Message
.Headers
// Copy Headers
233 if ar
{ // ACK receipt wanted
234 wh
= wh
.Add(stompngo
.HK_RECEIPT
, "rwanted-"+mcs
)
236 sngecomm
.HandleAck(conn
, wh
, id
)
237 ll
.Printf("%stag:%s connsess:%s individual_ack_complete mc:%v headers:%v\n",
238 exampid
, tag
, session
,
239 mc
, md
.Message
.Headers
)
243 // Check for end of loop condition
248 // Increment loop/message counter
252 qc
:= make(chan bool)
253 go drainSub(session
, sc
, qc
, qn
)
256 // Issue the final ACK if needed
258 wh
:= lmd
.Message
.Headers
// Copy Headers
259 if ar
{ // ACK receipt wanted
260 wh
= wh
.Add(stompngo
.HK_RECEIPT
, "rwanted-fin")
262 sngecomm
.HandleAck(conn
, wh
, id
)
263 ll
.Printf("%stag:%s connsess:%s final_ack_complete\n",
264 exampid
, tag
, session
)
266 ll
.Printf("%stag:%s connsess:%s ack_receive_wait_for_drain qn:%d\n",
267 exampid
, tag
, session
,
269 <-qc
// Wait for drainSub
270 ll
.Printf("%stag:%s connsess:%s ack_receive_drain_complete qn:%d\n",
271 exampid
, tag
, session
,
279 ll
.Printf("%stag:%s connsess:%s message_loop_wait_for_drain qn:%d\n",
280 exampid
, tag
, session
,
283 ll
.Printf("%stag:%s connsess:%s message_loop_drain_complete qn:%d\n",
284 exampid
, tag
, session
,
288 // Unsubscribe (may be skipped if requested)
290 sngecomm
.HandleUnsubscribe(conn
, d
, id
)
291 ll
.Printf("%stag:%s connsess:%s stomp_unsubscribe_complete\n",
292 exampid
, tag
, session
)
294 ll
.Printf("%stag:%s connsess:%s skipping_unsubscribe\n",
295 exampid
, tag
, session
)
299 // Handle a subscribe for the different protocol levels.
300 func doSubscribe(c
*stompngo
.Connection
, d
, id
, a
string, h stompngo
.Headers
) <-chan stompngo
.MessageData
{
301 h
= h
.Add("destination", d
).Add("ack", a
)
303 switch c
.Protocol() {
304 case stompngo
.SPL_12
:
305 // Add required id header
307 case stompngo
.SPL_11
:
308 // Add required id header
310 case stompngo
.SPL_10
:
311 // Nothing else to do here
313 ll
.Fatalf("v1:%v\n", "subscribe invalid protocol level, should not happen")
316 r
, e
:= c
.Subscribe(h
)
318 ll
.Fatalf("subscribe failed err:[%v]\n", e
)
324 func getReceipt(conn
*stompngo
.Connection
) {
325 rd
:= <-conn
.MessageData
326 ll
.Printf("%stag:%s connsess:%s have_receipt_sub md:%v\n",
327 exampid
, tag
, session
,
331 // Drain a subscription
332 func drainSub(s
string, sc
<-chan stompngo
.MessageData
, dc
chan bool, qn
int) {
333 ll
.Printf("%stag:%s connsess:%s drain_starts qn:%d\n",
336 tmr
:= time
.NewTimer(1 * time
.Second
) // Time to wait, YMMV
341 ll
.Printf("%stag:%s connsess:%s drained md:%v\n",
352 ll
.Printf("%stag:%s connsess:%s drain_ends qn:%d\n",