2 // Copyright © 2012-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. Each sender and receiver
24 operates under a unique network connection.
28 # A few queues and a few messages:
29 STOMP_NQS=5 STOMP_NMSGS=10 go run srmgor_manyconn.go
43 "github.com/gmallard/stompngo"
44 // senv methods could be used in general by stompngo clients.
45 "github.com/gmallard/stompngo/senv"
46 // sngecomm methods are used specifically for these example clients.
47 "github.com/gmallard/stompngo_examples/sngecomm"
51 exampid
= "srmgor_manyconn: "
56 // We 'stagger' between each message send and message receive for a random
58 // Vary these for experimental purposes. YMMV.
59 max
int64 = 1e9
// Max stagger time (nanoseconds)
60 min
int64 = max
/ 10 // Min stagger time (nanoseconds)
73 ll
= log
.New(os
.Stdout
, "EMSMR ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
78 func sendMessages(conn
*stompngo
.Connection
, qnum
int, nc net
.Conn
) {
79 ltag
:= tag
+ "-sendmessages"
81 qns
:= fmt
.Sprintf("%d", qnum
) // queue number
82 d
:= sngecomm
.Dest() + "." + string(exampid
[:len(exampid
)-2]) + "." + qns
83 ll
.Printf("%stag:%s connsess:%s start d:%s qnum:%d\n",
84 exampid
, ltag
, conn
.Session(),
86 wh
:= stompngo
.Headers
{"destination", d
,
87 "qnum", qns
} // send Headers
88 if senv
.Persistent() {
89 wh
= wh
.Add("persistent", "true")
92 tmr
:= time
.NewTimer(100 * time
.Hour
)
94 for mc
:= 1; mc
<= nmsgs
; mc
++ {
95 mcs
:= fmt
.Sprintf("%d", mc
)
96 sh
:= append(wh
, "msgnum", mcs
)
97 // Generate a message to send ...............
99 ll
.Printf("%stag:%s connsess:%s message mc:%d qnum:%d\n",
100 exampid
, ltag
, conn
.Session(),
102 e
:= conn
.Send(sh
, string(sngecomm
.Partial()))
104 ll
.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
105 exampid
, ltag
, conn
.Session(),
106 qnum
, e
.Error()) // Handle this ......
112 runtime
.Gosched() // yield for this example
113 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
114 ll
.Printf("%stag:%s connsess:%s send_stagger dt:%v qnum:%d mc:%d\n",
115 exampid
, ltag
, conn
.Session(),
123 func receiveMessages(conn
*stompngo
.Connection
, qnum
int, nc net
.Conn
) {
124 ltag
:= tag
+ "-receivemessages"
126 qns
:= fmt
.Sprintf("%d", qnum
) // queue number
127 d
:= sngecomm
.Dest() + "." + string(exampid
[:len(exampid
)-2]) + "." + qns
128 id
:= stompngo
.Uuid() // A unique subscription ID
130 ll
.Printf("%stag:%s connsess:%s receiveMessages_start id:%s d:%s qnum:%d nmsgs:%d\n",
131 exampid
, ltag
, conn
.Session(),
134 sc
:= sngecomm
.HandleSubscribe(conn
, d
, id
, sngecomm
.AckMode())
136 pbc
:= sngecomm
.Pbc() // Print byte count
139 tmr
:= time
.NewTimer(100 * time
.Hour
)
140 var md stompngo
.MessageData
141 for mc
:= 1; mc
<= nmsgs
; mc
++ {
145 case md
= <-conn
.MessageData
:
146 // Frames RECEIPT or ERROR not expected here
147 ll
.Fatalf("%stag:%s connsess:%s send_error qns:%v md:%v",
148 exampid
, ltag
, conn
.Session(),
149 qns
, md
) // Handle this ......
152 ll
.Fatalf("%stag:%s connsess:%s receive_error qns:%v error:%v\n",
153 exampid
, ltag
, conn
.Session(),
157 if md
.Message
.Command
!= stompngo
.MESSAGE
{
158 ll
.Fatalf("%stag:%s connsess:%s bad_frame qns:%s mc:%d md:%v\n",
159 exampid
, ltag
, conn
.Session(),
163 mcs
:= fmt
.Sprintf("%d", mc
) // message number
164 if !md
.Message
.Headers
.ContainsKV("qnum", qns
) ||
!md
.Message
.Headers
.ContainsKV("msgnum", mcs
) {
165 ll
.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v md:%v",
166 exampid
, tag
, conn
.Session(),
167 qns
, mcs
, md
) // Handle this ......
170 // Process the inbound message .................
171 sl
:= len(md
.Message
.Body
)
174 if len(md
.Message
.Body
) < sl
{
175 sl
= len(md
.Message
.Body
)
178 ll
.Printf("%stag:%s connsess:%s receiveMessages_msg d:%s body:%s qnum:%d msgnum:%s\n",
179 exampid
, ltag
, conn
.Session(),
180 d
, string(md
.Message
.Body
[0:sl
]), qnum
,
181 md
.Message
.Headers
.Value("msgnum"))
185 // Handle ACKs if needed
186 if sngecomm
.AckMode() != "auto" {
188 sngecomm
.HandleAck(conn
, ah
, id
)
195 runtime
.Gosched() // yield for this example
196 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, rf
))
197 ll
.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s mc:%d\n",
198 exampid
, ltag
, conn
.Session(),
204 ll
.Printf("%stag:%s connsess:%s end d:%s qnum:%d nmsgs:%d\n",
205 exampid
, ltag
, conn
.Session(),
209 sngecomm
.HandleUnsubscribe(conn
, d
, id
)
213 func runReceiver(qnum
int) {
214 ltag
:= tag
+ "-runreceiver"
216 ll
.Printf("%stag:%s connsess:%s start qnum:%d\n",
217 exampid
, ltag
, sngecomm
.Lcs
,
220 // Standard example connect sequence
221 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, ltag
, ll
)
223 ll
.Fatalf("%stag:%s connsess:%s error:%s\n",
224 exampid
, ltag
, sngecomm
.Lcs
,
225 e
.Error()) // Handle this ......
229 conn
.SetSubChanCap(senv
.SubChanCap()) // Experiment with this value, YMMV
231 receiveMessages(conn
, qnum
, n
)
233 ll
.Printf("%stag:%s connsess:%s receives_complete qnum:%d\n",
234 exampid
, ltag
, conn
.Session(),
237 // Standard example disconnect sequence
238 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, ltag
, ll
)
240 ll
.Fatalf("%stag:%s connsess:%s error:%s\n",
241 exampid
, ltag
, conn
.Session(),
242 e
.Error()) // Handle this ......
245 sngecomm
.ShowStats(exampid
, "recv_"+fmt
.Sprintf("%d", qnum
), conn
)
249 func runSender(qnum
int) {
251 ltag
:= tag
+ "-runsender"
253 ll
.Printf("%stag:%s connsess:%s start qnum:%d\n",
254 exampid
, ltag
, sngecomm
.Lcs
,
256 // Standard example connect sequence
257 n
, conn
, e
:= sngecomm
.CommonConnect(exampid
, ltag
, ll
)
259 ll
.Fatalf("%stag:%s connsess:%s error:%s\n",
260 exampid
, ltag
, sngecomm
.Lcs
,
261 e
.Error()) // Handle this ......
265 sendMessages(conn
, qnum
, n
)
267 ll
.Printf("%stag:%s connsess:%s sends_complete qnum:%d\n",
268 exampid
, ltag
, conn
.Session(),
271 // Standard example disconnect sequence
272 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, ltag
, ll
)
274 ll
.Fatalf("%stag:%s connsess:%s error:%s\n",
275 exampid
, ltag
, conn
.Session(),
276 e
.Error()) // Handle this ......
278 sngecomm
.ShowStats(exampid
, "send_"+fmt
.Sprintf("%d", qnum
), conn
)
286 sngecomm
.ShowRunParms(exampid
)
288 ll
.Printf("%stag:%s connsess:%s main_starts\n",
289 exampid
, tag
, sngecomm
.Lcs
)
291 ll
.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
292 exampid
, tag
, sngecomm
.Lcs
,
295 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
296 exampid
, tag
, sngecomm
.Lcs
,
297 runtime
.GOMAXPROCS(-1))
299 if sngecomm
.SetMAXPROCS() {
300 nc
:= runtime
.NumCPU()
301 ll
.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
302 exampid
, tag
, sngecomm
.Lcs
,
304 gmp
:= runtime
.GOMAXPROCS(nc
)
305 ll
.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
306 exampid
, tag
, sngecomm
.Lcs
,
308 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
309 exampid
, tag
, sngecomm
.Lcs
,
310 runtime
.GOMAXPROCS(-1))
313 sw
= sngecomm
.SendWait()
314 rw
= sngecomm
.RecvWait()
315 sf
= sngecomm
.SendFactor()
316 rf
= sngecomm
.RecvFactor()
317 ll
.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
318 exampid
, tag
, sngecomm
.Lcs
,
321 numq
:= sngecomm
.Nqs()
322 nmsgs
= senv
.Nmsgs() // message count
324 ll
.Printf("%stag:%s connsess:%s main_starting_receivers\n",
325 exampid
, tag
, sngecomm
.Lcs
)
326 for q
:= 1; q
<= numq
; q
++ {
330 ll
.Printf("%stag:%s connsess:%s main_started_receivers\n",
331 exampid
, tag
, sngecomm
.Lcs
)
333 ll
.Printf("%stag:%s connsess:%s main_starting_senders\n",
334 exampid
, tag
, sngecomm
.Lcs
)
335 for q
:= 1; q
<= numq
; q
++ {
339 ll
.Printf("%stag:%s connsess:%s main_started_senders\n",
340 exampid
, tag
, sngecomm
.Lcs
)
343 ll
.Printf("%stag:%s connsess:%s main_senders_complete\n",
344 exampid
, tag
, sngecomm
.Lcs
)
346 ll
.Printf("%stag:%s connsess:%s main_receivers_complete\n",
347 exampid
, tag
, sngecomm
.Lcs
)
351 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
352 exampid
, tag
, sngecomm
.Lcs
,
354 time
.Sleep(250 * time
.Millisecond
)