2 // Copyright © 2011-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. All senders share a single
24 STOMP connection, as do all receivers.
38 "github.com/gmallard/stompngo"
39 // senv methods could be used in general by stompngo clients.
40 "github.com/gmallard/stompngo/senv"
41 // sngecomm methods are used specifically for these example clients.
42 "github.com/gmallard/stompngo_examples/sngecomm"
46 exampid
= "srmgor_2conn: "
52 // We 'stagger' between each message send and message receive for a random
54 // Vary these for experimental purposes. YMMV.
55 max
int64 = 1e9
// Max stagger time (nanoseconds)
56 min
int64 = max
/ 10 // Min stagger time (nanoseconds)
66 // Possible profile file
67 cpuprofile
= flag
.String("cpuprofile", "", "write cpu profile to file")
69 ll
= log
.New(os
.Stdout
, "E1S1R ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
74 // Send messages to a particular queue
75 func sender(conn
*stompngo
.Connection
, qn
, nmsgs
int) {
76 ltag
:= tag
+ "-sender"
78 qns
:= fmt
.Sprintf("%d", qn
) // queue number
79 d
:= sngecomm
.Dest() + "." + string(exampid
[:len(exampid
)-2]) + "." + qns
80 ll
.Printf("%stag:%s connsess:%s starts qn:%d nmsgs:%d d:%s\n",
81 exampid
, ltag
, conn
.Session(),
84 wh
:= stompngo
.Headers
{"destination", d
,
85 "qnum", qns
} // send Headers
86 if senv
.Persistent() {
87 wh
= wh
.Add("persistent", "true")
90 tmr
:= time
.NewTimer(100 * time
.Hour
)
92 for i
:= 1; i
<= nmsgs
; i
++ {
93 si
:= fmt
.Sprintf("%d", i
)
94 sh
:= append(wh
, "msgnum", si
)
95 // Generate a message to send ...............
96 ll
.Printf("%stag:%s connsess:%s message qns:%s si:%s\n",
97 exampid
, ltag
, conn
.Session(),
99 e
:= conn
.Send(sh
, string(sngecomm
.Partial()))
101 ll
.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
102 exampid
, ltag
, conn
.Session(),
103 qn
, e
.Error()) // Handle this ......
109 runtime
.Gosched() // yield for this example
110 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
111 ll
.Printf("%stag:%s connsess:%s send_stagger dt:%v qns:%s\n",
112 exampid
, ltag
, conn
.Session(),
119 ll
.Printf("%stag:%s connsess:%s sender_ends qn:%d nmsgs:%d\n",
120 exampid
, ltag
, conn
.Session(),
125 // Asynchronously process all messages for a given subscription.
126 func receiveWorker(sc
<-chan stompngo
.MessageData
, qns
string, nmsgs
int,
127 qc
chan<- bool, conn
*stompngo
.Connection
, id
string) {
129 ltag
:= tag
+ "-receiveWorker"
131 tmr
:= time
.NewTimer(100 * time
.Hour
)
133 pbc
:= sngecomm
.Pbc() // Print byte count
136 var md stompngo
.MessageData
137 for i
:= 1; i
<= nmsgs
; i
++ {
141 case md
= <-conn
.MessageData
:
142 // Frames RECEIPT or ERROR not expected here
143 ll
.Fatalf("%stag:%s connsess:%s bad_frame qns:%v md:%v",
144 exampid
, ltag
, conn
.Session(),
145 qns
, md
) // Handle this ......
148 ll
.Fatalf("%stag:%s connsess:%s recv_error qns:%v error:%v",
149 exampid
, ltag
, conn
.Session(),
150 qns
, md
.Error
) // Handle this ......
153 // Sanity check the queue and message numbers
154 mns
:= fmt
.Sprintf("%d", i
) // message number
155 if !md
.Message
.Headers
.ContainsKV("qnum", qns
) ||
!md
.Message
.Headers
.ContainsKV("msgnum", mns
) {
156 ll
.Fatalf("%stag:%s connsess:%s dirty_message qnum:%v msgnum:%v md:%v",
157 exampid
, ltag
, conn
.Session(),
158 qns
, mns
, md
) // Handle this ......
161 // Process the inbound message .................
162 sl
:= len(md
.Message
.Body
)
165 if len(md
.Message
.Body
) < sl
{
166 sl
= len(md
.Message
.Body
)
170 // Handle ACKs if needed
171 if sngecomm
.AckMode() != "auto" {
173 sngecomm
.HandleAck(conn
, ah
, id
)
175 ll
.Printf("%stag:%s connsess:%s recv_message body:%s qns:%s msgnum:%s i:%v\n",
176 exampid
, ltag
, conn
.Session(),
177 string(md
.Message
.Body
[0:sl
]),
179 md
.Message
.Headers
.Value("msgnum"), i
)
184 runtime
.Gosched() // yield for this example
185 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, rf
))
186 ll
.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s\n",
187 exampid
, ltag
, conn
.Session(),
197 // Receive messages from a particular queue
198 func receiver(conn
*stompngo
.Connection
, qn
, nmsgs
int) {
199 ltag
:= tag
+ "-receiver"
201 qns
:= fmt
.Sprintf("%d", qn
) // queue number
202 ll
.Printf("%stag:%s connsess:%s starts qns:%d nmsgs:%d\n",
203 exampid
, ltag
, conn
.Session(),
206 qp
:= sngecomm
.Dest() // queue name prefix
207 q
:= qp
+ "." + string(exampid
[:len(exampid
)-2]) + "." + qns
208 ll
.Printf("%stag:%s connsess:%s queue_info q:%s qn:%d nmsgs:%d\n",
209 exampid
, ltag
, conn
.Session(),
211 id
:= stompngo
.Uuid() // A unique subscription ID
212 sc
:= sngecomm
.HandleSubscribe(conn
, q
, id
, sngecomm
.AckMode())
213 ll
.Printf("%stag:%s connsess:%s subscribe_complete\n",
214 exampid
, ltag
, conn
.Session())
215 // Many receivers running under the same connection can cause
216 // (wire read) performance issues. This is *very* dependent on the broker
217 // being used, specifically the broker's algorithm for putting messages on
219 // To alleviate those issues, this strategy insures that messages are
220 // received from the wire as soon as possible. Those messages are then
221 // buffered internally for (possibly later) application processing.
224 if s
:= os
.Getenv("STOMP_CONN2BUFFER"); s
!= "" {
225 i
, e
:= strconv
.ParseInt(s
, 10, 32)
227 ll
.Fatalf("%stag:%s connsess:%s CONN2BUFFER_conversion_error error:%v",
228 exampid
, ltag
, conn
.Session(),
229 e
.Error()) // Handle this ......
238 ll
.Printf("%stag:%s connsess:%s mdbuffersize_qnum bs:%d qn:%d\n",
239 exampid
, ltag
, conn
.Session(),
242 // Process all inputs async .......
243 // var mc chan stompngo.MessageData
244 mdc
:= make(chan stompngo
.MessageData
, bs
) // MessageData Buffer size
245 dc
:= make(chan bool) // Receive processing done channel
246 go receiveWorker(mdc
, qns
, nmsgs
, dc
, conn
, id
) // Start async processor
247 for i
:= 1; i
<= nmsgs
; i
++ {
248 mdc
<- <-sc
// Receive message data as soon as possible, and internally queue it
250 ll
.Printf("%stag:%s connsess:%s waitforWorkersBegin qns:%s\n",
251 exampid
, ltag
, conn
.Session(),
253 <-dc
// Wait until receive processing is done for this queue
254 ll
.Printf("%stag:%s connsess:%s waitforWorkersEnd qns:%s\n",
255 exampid
, ltag
, conn
.Session(),
259 sngecomm
.HandleUnsubscribe(conn
, q
, id
)
260 ll
.Printf("%stag:%s connsess:%s unsubscribe_complete\n",
261 exampid
, ltag
, conn
.Session())
264 ll
.Printf("%stag:%s connsess:%s ends qns:%s\n",
265 exampid
, ltag
, conn
.Session(),
270 func startSenders(qn
int) {
271 ltag
:= tag
+ "-startsenders"
273 ll
.Printf("%stag:%s connsess:%s queue qn:%v\n",
274 exampid
, ltag
, sngecomm
.Lcs
,
277 // Standard example connect sequence
278 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, ltag
, ll
)
280 ll
.Fatalf("%stag:%s connsess:%s on_connect error:%v",
281 exampid
, ltag
, sngecomm
.Lcs
,
282 e
.Error()) // Handle this ......
285 nmsgs
:= senv
.Nmsgs() // message count
286 ll
.Printf("%stag:%s connsess:%s message_count nmsgs:%d qn:%d\n",
287 exampid
, ltag
, conn
.Session(),
289 for i
:= 1; i
<= qn
; i
++ { // all queues
291 go sender(conn
, i
, nmsgs
)
293 ll
.Printf("%stag:%s connsess:%s starts_done\n",
294 exampid
, ltag
, conn
.Session())
297 // Standard example disconnect sequence
298 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, ltag
, ll
)
300 ll
.Fatalf("%stag:%s connsess:%s on_disconnect error:%v",
301 exampid
, ltag
, conn
.Session(),
302 e
.Error()) // Handle this ......
305 sngecomm
.ShowStats(exampid
, ltag
, conn
)
309 func startReceivers(qn
int) {
310 ltag
:= tag
+ "-startreceivers"
312 ll
.Printf("%stag:%s connsess:%s starts qn:%d\n",
313 exampid
, ltag
, sngecomm
.Lcs
,
316 // Standard example connect sequence
317 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, ltag
, ll
)
319 ll
.Fatalf("%stag:%s connsess:%s on_connect error:%v",
320 exampid
, ltag
, sngecomm
.Lcs
,
321 e
.Error()) // Handle this ......
324 nmsgs
:= senv
.Nmsgs() // get message count
325 ll
.Printf("%stag:%s connsess:%s message_count nmsgs:%d qn:%d\n",
326 exampid
, ltag
, conn
.Session(),
328 for i
:= 1; i
<= qn
; i
++ { // all queues
330 go receiver(conn
, i
, nmsgs
)
332 ll
.Printf("%stag:%s connsess:%s starts_done\n",
333 exampid
, ltag
, conn
.Session())
336 // Standard example disconnect sequence
337 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, ltag
, ll
)
339 ll
.Fatalf("%stag:%s connsess:%s on_disconnect error:%v",
340 exampid
, ltag
, conn
.Session(),
341 e
.Error()) // Handle this ......
344 sngecomm
.ShowStats(exampid
, ltag
, conn
)
348 // Show a number of writers and readers operating concurrently from unique
354 sngecomm
.ShowRunParms(exampid
)
356 ll
.Printf("%stag:%s connsess:%s main_starts\n",
357 exampid
, tag
, sngecomm
.Lcs
)
359 ll
.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
360 exampid
, tag
, sngecomm
.Lcs
,
363 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
364 exampid
, tag
, sngecomm
.Lcs
,
365 runtime
.GOMAXPROCS(-1))
367 if sngecomm
.SetMAXPROCS() {
368 nc
:= runtime
.NumCPU()
369 ll
.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
370 exampid
, tag
, sngecomm
.Lcs
,
372 gmp
:= runtime
.GOMAXPROCS(nc
)
373 ll
.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
374 exampid
, tag
, sngecomm
.Lcs
,
376 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
377 exampid
, tag
, sngecomm
.Lcs
,
378 runtime
.GOMAXPROCS(-1))
381 sw
= sngecomm
.SendWait()
382 rw
= sngecomm
.RecvWait()
383 sf
= sngecomm
.SendFactor()
384 rf
= sngecomm
.RecvFactor()
385 ll
.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
386 exampid
, tag
, sngecomm
.Lcs
,
396 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
397 exampid
, tag
, sngecomm
.Lcs
,
399 time
.Sleep(250 * time
.Millisecond
)