2 // Copyright © 2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 Function: read the code :-).
30 "github.com/gmallard/stompngo"
31 "github.com/gmallard/stompngo/senv"
32 "github.com/gmallard/stompngo_examples/sngecomm"
36 exampid
= "varmGetter: "
37 ll
= log
.New(os
.Stdout
, "VRMG ", log
.Ldate|log
.Lmicroseconds
)
41 ar
= false // Want ACK RECEIPT
45 if os
.Getenv("VMG_NOUNSUB") != "" {
48 if os
.Getenv("VMG_NODISC") != "" {
51 if os
.Getenv("VMG_GETAR") != "" {
56 // Connect to a STOMP broker, subscribe and receive some messages and disconnect.
61 // Standard example connect sequence
62 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, tag
, ll
)
64 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
65 exampid
, tag
, sngecomm
.Lcs
,
66 e
.Error()) // Handle this ......
69 conn
.SetLogger(ll
) // stompngo logging
70 pbc
:= sngecomm
.Pbc() // Print byte count
71 d
:= senv
.Dest() // Destination
72 id
:= stompngo
.Uuid() // A unique name/id
73 nmsgs
:= senv
.Nmsgs() // int number of messages to get
74 mns
:= fmt
.Sprintf("%d", nmsgs
) // string number of messages to get
75 am
:= sngecomm
.AckMode() // ACK mode to use on SUBSCRIBE
76 nfa
:= true // Need "final" ACK (possiby reset below)
77 wh
:= stompngo
.Headers
{ // Starting SUBSCRIBE headers
78 stompngo
.StompPlusDrainAfter
,
79 mns
} // Need a string here
81 // Sanity check ACK mode
82 if conn
.Protocol() == stompngo
.SPL_10
&&
83 am
== stompngo
.AckModeClientIndividual
{
84 ll
.Fatalf("%stag:%s connsess:%s invalid_ack_mode am:%v proto:%v\n",
85 exampid
, tag
, conn
.Session(),
86 am
, conn
.Protocol()) //
88 // Do not do final ACK if running ACKs are issued
89 if am
== stompngo
.AckModeClientIndividual ||
90 am
== stompngo
.AckModeAuto
{
94 // Show run parameters
95 ll
.Printf("%stag:%s connsess:%s run_parms\n\tpbc:%v\n\td:%v\n\tid:%v\n\tnmsgs:%v\n\tam:%v\n\tnfa:%v\n\twh:%v\n",
96 exampid
, tag
, conn
.Session(),
97 pbc
, d
, id
, nmsgs
, am
, nfa
, wh
)
100 sc
:= doSubscribe(conn
, d
, id
, am
, wh
)
101 ll
.Printf("%stag:%s connsess:%s stomp_subscribe_complete\n",
102 exampid
, tag
, conn
.Session())
104 var md stompngo
.MessageData
// Message data from basic read
105 var lmd stompngo
.MessageData
// Possible save (copy) of received data
106 mc
:= 1 // Initial message number
108 // Loop for the requested number of messages
111 ll
.Printf("%stag:%s connsess:%s start_of_read_loop mc:%v nmsgs:%v\n",
112 exampid
, tag
, conn
.Session(), mc
, nmsgs
)
114 mcs
:= fmt
.Sprintf("%d", mc
) // string number message count
116 // Get something from the stompngo read routine
119 case md
= <-conn
.MessageData
:
121 if md
.Message
.Command
== stompngo
.RECEIPT
{
122 ll
.Printf("%stag:%s connsess:%s have_receipt md:%v\n",
123 exampid
, tag
, conn
.Session(),
127 ll
.Fatalf("%stag:%s connsess:%s ERROR_frame hdrs:%v body:%v\n",
128 exampid
, tag
, conn
.Session(),
129 md
.Message
.Headers
, string(md
.Message
.Body
)) // Handle this ......
132 // Save message data for possible use in the final ACK
133 if mc
== nmsgs
&& nfa
{
134 lmd
= md
// Save last message
137 // Basic loop logging
138 ll
.Printf("%stag:%s connsess:%s channel_read_complete\n",
139 exampid
, tag
, conn
.Session())
140 ll
.Printf("%stag:%s connsess:%s message_number:%v\n",
141 exampid
, tag
, conn
.Session(),
144 // Check if reader returned any error
146 ll
.Fatalf("%stag:%s connsess:%s error_read error:%v",
147 exampid
, tag
, conn
.Session(),
148 md
.Error
) // Handle this ......
152 ll
.Printf("%stag:%s connsess:%s frame_type cmd:%s\n",
153 exampid
, tag
, conn
.Session(),
156 // Pure sanity check: this should *never* happen based on logic
158 if md
.Message
.Command
!= stompngo
.MESSAGE
{
159 ll
.Fatalf("%stag:%s connsess:%s error_frame_type md:%v",
160 exampid
, tag
, conn
.Session(),
161 md
) // Handle this ......
164 // Show Message Headers
165 wh
:= md
.Message
.Headers
166 for j
:= 0; j
< len(wh
)-1; j
+= 2 {
167 ll
.Printf("%stag:%s connsess:%s Header:%s:%s\n",
168 exampid
, tag
, conn
.Session(),
171 // Show (part of) Message Body
174 if len(md
.Message
.Body
) < maxlen
{
175 maxlen
= len(md
.Message
.Body
)
177 ss
:= string(md
.Message
.Body
[0:maxlen
])
178 ll
.Printf("%stag:%s connsess:%s payload body:%s\n",
179 exampid
, tag
, conn
.Session(),
183 // Run individual ACK if required
184 if am
== stompngo
.AckModeClientIndividual
{
185 wh
:= md
.Message
.Headers
// Copy Headers
186 if ar
{ // ACK receipt wanted
187 wh
= wh
.Add(stompngo
.HK_RECEIPT
, "rwanted-"+mcs
)
189 sngecomm
.HandleAck(conn
, wh
, id
)
190 ll
.Printf("%stag:%s connsess:%s individual_ack_complete mc:%v headers:%v\n",
191 exampid
, tag
, conn
.Session(),
192 mc
, md
.Message
.Headers
)
196 // Check for end of loop condition
201 // Increment loop/message counter
205 // Issue the final ACK if needed
207 wh
:= lmd
.Message
.Headers
// Copy Headers
208 if ar
{ // ACK receipt wanted
209 wh
= wh
.Add(stompngo
.HK_RECEIPT
, "rwanted-fin")
211 sngecomm
.HandleAck(conn
, wh
, id
)
212 ll
.Printf("%stag:%s connsess:%s final_ack_complete\n",
213 exampid
, tag
, conn
.Session())
219 // Unsubscribe (may be skipped if requested)
221 sngecomm
.HandleUnsubscribe(conn
, d
, id
)
222 ll
.Printf("%stag:%s connsess:%s stomp_unsubscribe_complete\n",
223 exampid
, tag
, conn
.Session())
225 ll
.Printf("%stag:%s connsess:%s skipping_unsubscribe\n",
226 exampid
, tag
, conn
.Session())
229 // Standard example disconnect sequence (may be skipped if requested)
230 // a) Closes the *stompngo.Connection
231 // b) Closes the net.Conn
233 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, tag
, ll
)
235 ll
.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
236 exampid
, tag
, conn
.Session(),
237 e
.Error()) // Handle this ......
239 ll
.Printf("%stag:%s connsess:%s disconnect_receipt:%v\n",
240 exampid
, tag
, conn
.Session(),
241 conn
.DisconnectReceipt
)
243 ll
.Printf("%stag:%s connsess:%s skipping_disconnect\n",
244 exampid
, tag
, conn
.Session())
247 // End of work logging, show elapsed time
248 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
249 exampid
, tag
, conn
.Session(),
253 // Handle a subscribe for the different protocol levels.
254 func doSubscribe(c
*stompngo
.Connection
, d
, id
, a
string, h stompngo
.Headers
) <-chan stompngo
.MessageData
{
255 h
= h
.Add("destination", d
).Add("ack", a
)
257 switch c
.Protocol() {
258 case stompngo
.SPL_12
:
259 // Add required id header
261 case stompngo
.SPL_11
:
262 // Add required id header
264 case stompngo
.SPL_10
:
265 // Nothing else to do here
267 ll
.Fatalf("v1:%v\n", "subscribe invalid protocol level, should not happen")
270 r
, e
:= c
.Subscribe(h
)
272 ll
.Fatalf("subscribe failed err:[%v]\n", e
)
278 func getReceipt(conn
*stompngo
.Connection
) {
279 rd
:= <-conn
.MessageData
280 ll
.Printf("%stag:%s connsess:%s have_receipt_sub md:%v\n",
281 exampid
, tag
, conn
.Session(),