2 // Copyright © 2012-2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. Each sender and receiver
24 operates under a unique network connection.
28 # A few queues and a few messages:
29 STOMP_NQS=5 STOMP_NMSGS=10 go run srmgor_manyconn.go
43 "github.com/davecheney/profile"
45 "github.com/gmallard/stompngo"
46 // senv methods could be used in general by stompngo clients.
47 "github.com/gmallard/stompngo/senv"
48 // sngecomm methods are used specifically for these example clients.
49 "github.com/gmallard/stompngo_examples/sngecomm"
53 exampid
= "srmgor_manyconn: "
58 // We 'stagger' between each message send and message receive for a random
60 // Vary these for experimental purposes. YMMV.
61 max
int64 = 1e9
// Max stagger time (nanoseconds)
62 min
int64 = max
/ 10 // Min stagger time (nanoseconds)
75 ll
= log
.New(os
.Stdout
, "EMSMR ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
80 func sendMessages(conn
*stompngo
.Connection
, qnum
int, nc net
.Conn
) {
81 ltag
:= tag
+ "-sendmessages"
83 qns
:= fmt
.Sprintf("%d", qnum
) // queue number
84 d
:= sngecomm
.Dest() + "." + qns
85 ll
.Printf("%stag:%s connsess:%s start d:%s qnum:%d\n",
86 exampid
, ltag
, conn
.Session(),
88 wh
:= stompngo
.Headers
{"destination", d
,
89 "qnum", qns
} // send Headers
90 if senv
.Persistent() {
91 wh
= wh
.Add("persistent", "true")
94 tmr
:= time
.NewTimer(100 * time
.Hour
)
96 for mc
:= 1; mc
<= nmsgs
; mc
++ {
97 mcs
:= fmt
.Sprintf("%d", mc
)
98 sh
:= append(wh
, "msgnum", mcs
)
99 // Generate a message to send ...............
101 ll
.Printf("%stag:%s connsess:%s message mc:%d qnum:%d\n",
102 exampid
, ltag
, conn
.Session(),
104 e
:= conn
.Send(sh
, string(sngecomm
.Partial()))
106 ll
.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
107 exampid
, ltag
, conn
.Session(),
108 qnum
, e
.Error()) // Handle this ......
114 runtime
.Gosched() // yield for this example
115 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
116 ll
.Printf("%stag:%s connsess:%s send_stagger dt:%v qnum:%s mc:%d\n",
117 exampid
, ltag
, conn
.Session(),
125 func receiveMessages(conn
*stompngo
.Connection
, qnum
int, nc net
.Conn
) {
126 ltag
:= tag
+ "-receivemessages"
128 qns
:= fmt
.Sprintf("%d", qnum
) // queue number
129 d
:= sngecomm
.Dest() + "." + qns
130 id
:= stompngo
.Uuid() // A unique subscription ID
132 ll
.Printf("%stag:%s connsess:%s receiveMessages_start id:%s d:%s qnum:%d nmsgs:%d\n",
133 exampid
, ltag
, conn
.Session(),
136 sc
:= sngecomm
.HandleSubscribe(conn
, d
, id
, sngecomm
.AckMode())
138 pbc
:= sngecomm
.Pbc() // Print byte count
141 tmr
:= time
.NewTimer(100 * time
.Hour
)
142 var md stompngo
.MessageData
143 for mc
:= 1; mc
<= nmsgs
; mc
++ {
147 case md
= <-conn
.MessageData
:
148 // Frames RECEIPT or ERROR not expected here
149 ll
.Fatalf("%stag:%s connsess:%s send_error qns:%v md:%v",
150 exampid
, ltag
, conn
.Session(),
151 qns
, md
) // Handle this ......
154 ll
.Fatalf("%stag:%s connsess:%s receive_error qns:%v error:%v\n",
155 exampid
, ltag
, conn
.Session(),
159 if md
.Message
.Command
!= stompngo
.MESSAGE
{
160 ll
.Fatalf("%stag:%s connsess:%s bad_frame qns:%s mc:%d md:%v\n",
161 exampid
, ltag
, conn
.Session(),
165 mcs
:= fmt
.Sprintf("%d", mc
) // message number
166 if !md
.Message
.Headers
.ContainsKV("qnum", qns
) ||
!md
.Message
.Headers
.ContainsKV("msgnum", mcs
) {
167 ll
.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v md:%v",
168 exampid
, tag
, conn
.Session(),
169 qns
, mcs
, md
) // Handle this ......
172 // Process the inbound message .................
173 sl
:= len(md
.Message
.Body
)
176 if len(md
.Message
.Body
) < sl
{
177 sl
= len(md
.Message
.Body
)
180 ll
.Printf("%stag:%s connsess:%s receiveMessages_msg d:%s body:%s qnum:%d msgnum:%s\n",
181 exampid
, ltag
, conn
.Session(),
182 d
, string(md
.Message
.Body
[0:sl
]), qnum
,
183 md
.Message
.Headers
.Value("msgnum"))
187 // Handle ACKs if needed
188 if sngecomm
.AckMode() != "auto" {
190 sngecomm
.HandleAck(conn
, ah
, id
)
197 runtime
.Gosched() // yield for this example
198 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, rf
))
199 ll
.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s mc:%d\n",
200 exampid
, ltag
, conn
.Session(),
206 ll
.Printf("%stag:%s connsess:%s end d:%s qnum:%d nmsgs:%d\n",
207 exampid
, ltag
, conn
.Session(),
211 sngecomm
.HandleUnsubscribe(conn
, d
, id
)
215 func runReceiver(qnum
int) {
216 ltag
:= tag
+ "-runreceiver"
218 ll
.Printf("%stag:%s connsess:%s start qnum:%d\n",
219 exampid
, ltag
, sngecomm
.Lcs
,
222 // Standard example connect sequence
223 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, ltag
, ll
)
225 ll
.Fatalf("%stag:%s connsess:%s error:%s\n",
226 exampid
, ltag
, sngecomm
.Lcs
,
227 e
.Error()) // Handle this ......
231 conn
.SetSubChanCap(senv
.SubChanCap()) // Experiment with this value, YMMV
233 receiveMessages(conn
, qnum
, n
)
235 ll
.Printf("%stag:%s connsess:%s receives_complete qnum:%d\n",
236 exampid
, ltag
, conn
.Session(),
239 // Standard example disconnect sequence
240 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, ltag
, ll
)
242 ll
.Fatalf("%stag:%s connsess:%s error:%s\n",
243 exampid
, ltag
, conn
.Session(),
244 e
.Error()) // Handle this ......
247 sngecomm
.ShowStats(exampid
, "recv_"+fmt
.Sprintf("%d", qnum
), conn
)
251 func runSender(qnum
int) {
253 ltag
:= tag
+ "-runsender"
255 ll
.Printf("%stag:%s connsess:%s start qnum:%d\n",
256 exampid
, ltag
, sngecomm
.Lcs
,
258 // Standard example connect sequence
259 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, ltag
, ll
)
261 ll
.Fatalf("%stag:%s connsess:%s error:%s\n",
262 exampid
, ltag
, sngecomm
.Lcs
,
263 e
.Error()) // Handle this ......
267 sendMessages(conn
, qnum
, n
)
269 ll
.Printf("%stag:%s connsess:%s sends_complete qnum:%d\n",
270 exampid
, ltag
, conn
.Session(),
273 // Standard example disconnect sequence
274 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, ltag
, ll
)
276 ll
.Fatalf("%stag:%s connsess:%s error:%s\n",
277 exampid
, ltag
, conn
.Session(),
278 e
.Error()) // Handle this ......
280 sngecomm
.ShowStats(exampid
, "send_"+fmt
.Sprintf("%d", qnum
), conn
)
288 sngecomm
.ShowRunParms(exampid
)
290 if sngecomm
.Pprof() {
291 cfg
:= profile
.Config
{
295 NoShutdownHook
: false, // Hook SIGINT
297 defer profile
.Start(&cfg
).Stop()
300 ll
.Printf("%stag:%s connsess:%s main_starts\n",
301 exampid
, tag
, sngecomm
.Lcs
)
303 ll
.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
304 exampid
, tag
, sngecomm
.Lcs
,
307 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
308 exampid
, tag
, sngecomm
.Lcs
,
309 runtime
.GOMAXPROCS(-1))
311 if sngecomm
.SetMAXPROCS() {
312 nc
:= runtime
.NumCPU()
313 ll
.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
314 exampid
, tag
, sngecomm
.Lcs
,
316 gmp
:= runtime
.GOMAXPROCS(nc
)
317 ll
.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
318 exampid
, tag
, sngecomm
.Lcs
,
320 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
321 exampid
, tag
, sngecomm
.Lcs
,
322 runtime
.GOMAXPROCS(-1))
325 sw
= sngecomm
.SendWait()
326 rw
= sngecomm
.RecvWait()
327 sf
= sngecomm
.SendFactor()
328 rf
= sngecomm
.RecvFactor()
329 ll
.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
330 exampid
, tag
, sngecomm
.Lcs
,
333 numq
:= sngecomm
.Nqs()
334 nmsgs
= senv
.Nmsgs() // message count
336 ll
.Printf("%stag:%s connsess:%s main_starting_receivers\n",
337 exampid
, tag
, sngecomm
.Lcs
)
338 for q
:= 1; q
<= numq
; q
++ {
342 ll
.Printf("%stag:%s connsess:%s main_started_receivers\n",
343 exampid
, tag
, sngecomm
.Lcs
)
345 ll
.Printf("%stag:%s connsess:%s main_starting_senders\n",
346 exampid
, tag
, sngecomm
.Lcs
)
347 for q
:= 1; q
<= numq
; q
++ {
351 ll
.Printf("%stag:%s connsess:%s main_started_senders\n",
352 exampid
, tag
, sngecomm
.Lcs
)
355 ll
.Printf("%stag:%s connsess:%s main_senders_complete\n",
356 exampid
, tag
, sngecomm
.Lcs
)
358 ll
.Printf("%stag:%s connsess:%s main_receivers_complete\n",
359 exampid
, tag
, sngecomm
.Lcs
)
363 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
364 exampid
, tag
, sngecomm
.Lcs
,