2 // Copyright © 2014-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receivers checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. All senders use one STOMP connection.
24 All receivers are balanced across multiple STOMP connections. Balancing
25 configuration is taken from environment variables.
39 "github.com/gmallard/stompngo"
40 // senv methods could be used in general by stompngo clients.
41 "github.com/gmallard/stompngo/senv"
42 // sngecomm methods are used specifically for these example clients.
43 "github.com/gmallard/stompngo_examples/sngecomm"
47 exampid
= "srmgor_1smrconn: "
49 // We 'stagger' between each message send and message receive for a random
51 // Vary these for experimental purposes. YMMV.
52 max
int64 = 1e9
// Max stagger time (nanoseconds)
53 min
int64 = max
/ 10 // Min stagger time (nanoseconds)
69 ll
= log
.New(os
.Stdout
, "E1SMR ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
75 openSconn opens a stompngo Connection.
77 func openSconn() (net
.Conn
, *stompngo
.Connection
) {
78 ltag
:= tag
+ "-opensconn"
80 // Standard example connect sequence
81 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, ltag
, ll
)
83 ll
.Fatalf("%stag:%s consess:%s connect_error error:%s\n",
84 exampid
, ltag
, sngecomm
.Lcs
,
85 e
.Error()) // Handle this ......
91 closeSconn closes a stompngo Connection.
93 func closeSconn(n net
.Conn
, conn
*stompngo
.Connection
) {
94 ltag
:= tag
+ "-closesconn"
96 // Standard example disconnect sequence
97 e
:= sngecomm
.CommonDisconnect(n
, conn
, exampid
, ltag
, ll
)
99 ll
.Fatalf("%stag:%s connsess:%s disconnect_error error:%s\n",
100 exampid
, ltag
, conn
.Session(),
101 e
.Error()) // Handle this ......
107 runReceive receives all messages from a specified queue.
109 func runReceive(conn
*stompngo
.Connection
, q
int, w
*sync
.WaitGroup
) {
110 ltag
:= tag
+ "-runreceive"
112 qns
:= fmt
.Sprintf("%d", q
) // queue number
113 id
:= stompngo
.Uuid() // A unique subscription ID
114 d
:= sngecomm
.Dest() + "." + string(exampid
[:len(exampid
)-2]) + "." + qns
116 ll
.Printf("%stag:%s connsess:%s starts id:%s qns:%s d:%s\n",
117 exampid
, ltag
, conn
.Session(),
120 // Subscribe (use common helper)
121 sc
:= sngecomm
.HandleSubscribe(conn
, d
, id
, sngecomm
.AckMode())
122 ll
.Printf("%stag:%s connsess:%s subscribe_done id:%s qns:%s d:%s\n",
123 exampid
, ltag
, conn
.Session(),
127 tmr
:= time
.NewTimer(100 * time
.Hour
)
129 pbc
:= sngecomm
.Pbc() // Print byte count
131 nmsgs
:= senv
.Nmsgs()
134 var md stompngo
.MessageData
135 for mc
:= 1; mc
<= nmsgs
; mc
++ {
136 ll
.Printf("%stag:%s connsess:%s chanchek id:%s qns:%s lensc:%d capsc:%d\n",
137 exampid
, ltag
, conn
.Session(),
138 id
, qns
, len(sc
), cap(sc
))
141 case md
= <-conn
.MessageData
:
142 // Frames RECEIPT or ERROR not expected here
143 ll
.Fatalf("%stag:%s connsess:%s send_error qns:%v md:%v",
144 exampid
, ltag
, conn
.Session(),
145 qns
, md
) // Handle this ......
149 ll
.Fatalf("%stag:%s connsess:%s receive_error qns:%v error:%v\n",
150 exampid
, ltag
, conn
.Session(),
154 // Process the inbound message .................
155 ll
.Printf("%stag:%s connsess:%s inbound id:%s qns:%s mc:%d\n",
156 exampid
, ltag
, conn
.Session(),
158 // Sanity check the message Command, and the queue and message numbers
159 mns
:= fmt
.Sprintf("%d", mc
) // string message number
160 if md
.Message
.Command
!= stompngo
.MESSAGE
{
161 ll
.Fatalf("%stag:%s connsess:%s bad_frame qns:%s mc:%d md:%v\n",
162 exampid
, ltag
, conn
.Session(),
165 if !md
.Message
.Headers
.ContainsKV("qnum", qns
) ||
!md
.Message
.Headers
.ContainsKV("msgnum", mns
) {
166 ll
.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v md:%v",
167 exampid
, tag
, conn
.Session(),
168 qns
, mns
, md
) // Handle this ......
171 sl
:= len(md
.Message
.Body
)
174 if len(md
.Message
.Body
) < sl
{
175 sl
= len(md
.Message
.Body
)
179 ll
.Printf("%stag:%s connsess:%s runReceive_recv_message id:%s body:%s qns:%s msgnum:%s\n",
180 exampid
, ltag
, conn
.Session(),
181 id
, string(md
.Message
.Body
[0:sl
]), qns
,
182 md
.Message
.Headers
.Value("msgnum"))
184 // Handle ACKs if needed
185 if sngecomm
.AckMode() != "auto" {
186 ah
:= stompngo
.Headers
{}
187 sngecomm
.HandleAck(conn
, ah
, id
)
193 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, rf
))
194 ll
.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s mc:%d\n",
195 exampid
, ltag
, conn
.Session(),
203 sngecomm
.HandleUnsubscribe(conn
, d
, id
)
205 ll
.Printf("%stag:%s connsess:%s runRecieve_ends id:%s qns:%s\n",
206 exampid
, ltag
, conn
.Session(),
212 receiverConnection starts individual receivers for this connection.
214 func receiverConnection(conn
*stompngo
.Connection
, cn
, qpc
int) {
215 ltag
:= tag
+ "-receiverconnection"
217 ll
.Printf("%stag:%s connsess:%s starts cn:%d qpc:%d\n",
218 exampid
, ltag
, conn
.Session(),
221 // cn -> a connection number: 1..n
222 // qpc -> destinations per connection
228 // This code runs *once* for each connection
230 // These calcs are what causes a skip below. It is a safety valve to keep
231 // from starting one too many connections.
232 cb
:= cn
- 1 // this connection number, zero based
233 q1
:= qpc
*cb
+ 1 // 1st queue number
234 ql
:= q1
+ qpc
- 1 // last queue number
235 if ql
> sngecomm
.Nqs() {
236 ql
= sngecomm
.Nqs() // truncate last if over max destinations
239 var wgrconn sync
.WaitGroup
243 ll
.Printf("%stag:%s connsess:%s startq cn:%d q1:%d ql: %d\n",
244 exampid
, ltag
, conn
.Session(),
248 // Skips are possible, at least with the current calling code, see above
249 ll
.Printf("%stag:%s connsess:%s startskip cn:%d q1:%d ql: %d\n",
250 exampid
, ltag
, conn
.Session(),
255 for q
:= q1
; q
<= ql
; q
++ {
257 go runReceive(conn
, q
, &wgrconn
)
261 ll
.Printf("%stag:%s connsess:%s ends cn:%d qpc:%d skipped:%t\n",
262 exampid
, ltag
, conn
.Session(),
268 startReceivers creates connections per environment variables, and starts each
271 func startReceivers() {
273 ltag
:= tag
+ "-startreceivers"
275 // This was a performance experiment. With number of connections.
276 // My recollection is that it did not work out.
277 // However ..... I will leave this code in place for now.
279 // Figure out number of receiver connections wanted
280 nrc
:= sngecomm
.Nqs() // 1 receiver per each destination
281 nqs
:= nrc
// Number of queues (destinations) starts the same
283 if s
:= os
.Getenv("STOMP_RECVCONNS"); s
!= "" {
284 i
, e
:= strconv
.ParseInt(s
, 10, 32)
286 ll
.Fatalf("%stag:%s connsess:%s RECVCONNS_conversion_error error:%v\n",
287 exampid
, ltag
, sngecomm
.Lcs
,
294 // Limit max receiver connection count to number of destinations
299 // Next calc. destinations per receiver
300 dpr
:= nqs
/ nrc
// Calculation first guess.
302 dpr
+= 1 // Bump destinations per receiver by 1.
304 // Destinations per receiver must be at least 1
309 ll
.Printf("%stag:%s connsess:%s start nrc:%d dpr:%d\n",
310 exampid
, ltag
, sngecomm
.Lcs
,
313 // So the idea seems to be allow more than one destination per receiver
314 ncm
:= make([]net
.Conn
, 0)
315 csm
:= make([]*stompngo
.Connection
, 0)
316 for c
:= 1; c
<= nrc
; c
++ { // :-)
317 n
, conn
:= openSconn()
319 csm
= append(csm
, conn
)
321 ll
.Printf("%stag:%s connsess:%s connstart conn_number:%d nrc:%d dpr:%d\n",
322 exampid
, ltag
, conn
.Session(),
324 go receiverConnection(conn
, c
, dpr
)
327 ll
.Printf("%stag:%s connsess:%s wait_done nrc:%d dpr:%d\n",
328 exampid
, ltag
, sngecomm
.Lcs
,
331 for c
:= 1; c
<= nrc
; c
++ {
332 ll
.Printf("%stag:%s connsess:%s connend conn_number:%d nrc:%d dpr:%d\n",
333 exampid
, ltag
, csm
[c
-1].Session(),
335 sngecomm
.ShowStats(exampid
, ltag
, csm
[c
-1])
336 closeSconn(ncm
[c
-1], csm
[c
-1])
343 runSender sends all messages to a specified queue.
345 func runSender(conn
*stompngo
.Connection
, qns
string) {
346 ltag
:= tag
+ "-runsender"
348 d
:= sngecomm
.Dest() + "." + string(exampid
[:len(exampid
)-2]) + "." + qns
349 id
:= stompngo
.Uuid() // A unique sender id
350 ll
.Printf("%stag:%s connsess:%s start id:%s dest:%s\n",
351 exampid
, ltag
, conn
.Session(),
353 wh
:= stompngo
.Headers
{"destination", d
, "senderId", id
,
354 "qnum", qns
} // basic send Headers
355 if senv
.Persistent() {
356 wh
= wh
.Add("persistent", "true")
358 tmr
:= time
.NewTimer(100 * time
.Hour
)
359 nmsgs
:= senv
.Nmsgs()
360 for mc
:= 1; mc
<= nmsgs
; mc
++ {
361 sh
:= append(wh
, "msgnum", fmt
.Sprintf("%d", mc
))
362 // Generate a message to send ...............
363 ll
.Printf("%stag:%s connsess:%s send id:%s qns:%s mc:%d\n",
364 exampid
, ltag
, conn
.Session(),
366 e
:= conn
.Send(sh
, string(sngecomm
.Partial()))
368 ll
.Fatalf("%stag:%s connsess:%s send_error qns:%v error:%v",
369 exampid
, ltag
, conn
.Session(),
370 qns
, e
.Error()) // Handle this ......
376 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
377 ll
.Printf("%stag:%s connsess:%s send_stagger dt:%v qns:%s mc:%d\n",
378 exampid
, ltag
, conn
.Session(),
385 ll
.Printf("%stag:%s connsess:%s end id:%s dest:%s\n",
386 exampid
, ltag
, conn
.Session(),
393 startSender initializes the single send connection, and starts one sender go
394 for each destination.
397 ltag
:= tag
+ "-startsender"
399 n
, conn
:= openSconn()
400 ll
.Printf("%stag:%s connsess:%s start\n",
401 exampid
, ltag
, conn
.Session())
402 for i
:= 1; i
<= sngecomm
.Nqs(); i
++ {
404 go runSender(conn
, fmt
.Sprintf("%d", i
))
407 ll
.Printf("%stag:%s connsess:%s end\n",
408 exampid
, ltag
, conn
.Session())
409 sngecomm
.ShowStats(exampid
, ltag
, conn
)
416 main is the driver for all logic.
422 sngecomm
.ShowRunParms(exampid
)
424 ll
.Printf("%stag:%s connsess:%s main_starts\n",
425 exampid
, tag
, sngecomm
.Lcs
)
427 ll
.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
428 exampid
, tag
, sngecomm
.Lcs
,
431 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
432 exampid
, tag
, sngecomm
.Lcs
,
433 runtime
.GOMAXPROCS(-1))
435 if sngecomm
.SetMAXPROCS() {
436 nc
:= runtime
.NumCPU()
437 ll
.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
438 exampid
, tag
, sngecomm
.Lcs
,
440 gmp
:= runtime
.GOMAXPROCS(nc
)
441 ll
.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
442 exampid
, tag
, sngecomm
.Lcs
,
444 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
445 exampid
, tag
, sngecomm
.Lcs
,
446 runtime
.GOMAXPROCS(-1))
449 sw
= sngecomm
.SendWait()
450 rw
= sngecomm
.RecvWait()
451 sf
= sngecomm
.SendFactor()
452 rf
= sngecomm
.RecvFactor()
453 ll
.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
454 exampid
, tag
, sngecomm
.Lcs
,
464 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
465 exampid
, tag
, sngecomm
.Lcs
,
467 time
.Sleep(250 * time
.Millisecond
)