2 // Copyright © 2011-2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Send and receive many STOMP messages using multiple queues and goroutines
24 to service each send or receive instance. All senders and receivers share the
25 same STOMP connection.
38 "github.com/gmallard/stompngo"
39 // senv methods could be used in general by stompngo clients.
40 "github.com/gmallard/stompngo/senv"
41 // sngecomm methods are used specifically for these example clients.
42 "github.com/gmallard/stompngo_examples/sngecomm"
46 ll
= log
.New(os
.Stdout
, "ECNDS ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
48 exampid
= "srmgor_1conn: "
54 // We 'stagger' between each message send and message receive for a random
56 // Vary these for experimental purposes. YMMV.
57 max
int64 = 1e9
// Max stagger time (nanoseconds)
58 min
int64 = max
/ 10 // Min stagger time (nanoseconds)
69 n net
.Conn
// Network Connection
70 conn
*stompngo
.Connection
// Stomp Connection
77 // Send messages to a particular queue
78 func sender(qn
, mc
int) {
79 ltag
:= tag
+ "-sender"
81 qns
:= fmt
.Sprintf("%d", qn
) // string queue number
82 id
:= stompngo
.Uuid() // A unique sender id
83 d
:= sngecomm
.Dest() + "." + string(exampid
[:len(exampid
)-2]) + "." + qns
85 ll
.Printf("%stag:%s connsess:%s queue_info id:%v d:%v qnum:%v mc:%v\n",
86 exampid
, ltag
, conn
.Session(),
89 wh
:= stompngo
.Headers
{"destination", d
, "senderId", id
,
90 "qnum", qns
} // send Headers
91 if senv
.Persistent() {
92 wh
= wh
.Add("persistent", "true")
95 tmr
:= time
.NewTimer(100 * time
.Hour
)
97 for i
:= 1; i
<= mc
; i
++ {
98 si
:= fmt
.Sprintf("%d", i
)
99 sh
:= append(wh
, "msgnum", si
)
100 // Generate a message to send ...............
101 ll
.Printf("%stag:%s connsess:%s send_headers id:%v d:%v qnum:%v headers:%v\n",
102 exampid
, ltag
, conn
.Session(),
104 e
:= conn
.Send(sh
, string(sngecomm
.Partial()))
106 ll
.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
107 exampid
, tag
, conn
.Session(),
108 qn
, e
.Error()) // Handle this ......
114 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
115 ll
.Printf("%stag:%s connsess:%s send_stagger id:%v d:%v qnum:%v stagger:%v\n",
116 exampid
, ltag
, conn
.Session(),
124 ll
.Printf("%stag:%s connsess:%s finish_info id:%v d:%v qnum:%v mc:%v\n",
125 exampid
, ltag
, conn
.Session(),
130 // Receive messages from a particular queue
131 func receiver(qn
, mc
int) {
132 ltag
:= tag
+ "-receiver"
134 qns
:= fmt
.Sprintf("%d", qn
) // string queue number
135 pbc
:= sngecomm
.Pbc()
136 id
:= stompngo
.Uuid() // A unique subscription ID
137 d
:= sngecomm
.Dest() + "." + string(exampid
[:len(exampid
)-2]) + "." + qns
139 ll
.Printf("%stag:%s connsess:%s queue_info id:%v d:%v qnum:%v mc:%v\n",
140 exampid
, ltag
, conn
.Session(),
143 sc
:= sngecomm
.HandleSubscribe(conn
, d
, id
, sngecomm
.AckMode())
144 ll
.Printf("%stag:%s connsess:%s subscribe_complete id:%v d:%v qnum:%v mc:%v\n",
145 exampid
, ltag
, conn
.Session(),
148 tmr
:= time
.NewTimer(100 * time
.Hour
)
149 var md stompngo
.MessageData
151 for i
:= 1; i
<= mc
; i
++ {
152 ll
.Printf("%stag:%s connsess:%s recv_ranchek id:%v d:%v qnum:%v mc:%v chlen:%v chcap:%v\n",
153 exampid
, ltag
, conn
.Session(),
154 id
, d
, qn
, mc
, len(sc
), cap(sc
))
158 case md
= <-conn
.MessageData
:
159 // A RECEIPT or ERROR frame is unexpected here
160 ll
.Fatalf("%stag:%s connsess:%s bad_frame qnum:%v headers:%v body:%s",
161 exampid
, tag
, conn
.Session(),
162 qn
, md
.Message
.Headers
, md
.Message
.Body
) // Handle this ......
165 ll
.Fatalf("%stag:%s connsess:%s recv_error qnum:%v error:%v",
166 exampid
, tag
, conn
.Session(),
167 qn
, md
.Error
) // Handle this ......
170 // Process the inbound message .................
171 ll
.Printf("%stag:%s connsess:%s recv_message qnum:%v i:%v\n",
172 exampid
, tag
, conn
.Session(),
176 if len(md
.Message
.Body
) < maxlen
{
177 maxlen
= len(md
.Message
.Body
)
179 ss
:= string(md
.Message
.Body
[0:maxlen
])
180 ll
.Printf("%stag:%s connsess:%s payload qnum:%v body:%s\n",
181 exampid
, tag
, conn
.Session(),
186 // Sanity check the message Command, and the queue and message numbers
187 mns
:= fmt
.Sprintf("%d", i
) // message number
188 if md
.Message
.Command
!= stompngo
.MESSAGE
{
189 ll
.Fatalf("%stag:%s connsess:%s bad_frame qnum:%v command:%v headers:%v body:%v\n",
190 exampid
, tag
, conn
.Session(),
191 qn
, md
.Message
.Command
, md
.Message
.Headers
, string(md
.Message
.Body
)) // Handle this ......
194 if !md
.Message
.Headers
.ContainsKV("qnum", qns
) ||
!md
.Message
.Headers
.ContainsKV("msgnum", mns
) {
195 ll
.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v command:%v headers:%v body:%v\n",
196 exampid
, tag
, conn
.Session(),
197 qns
, mns
, md
.Message
.Command
, md
.Message
.Headers
, string(md
.Message
.Body
)) // Handle this ......) // Handle this ......
205 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, rf
))
206 ll
.Printf("%stag:%s connsess:%s recv_stagger id:%v d:%v qnum:%v stagger:%v\n",
207 exampid
, ltag
, conn
.Session(),
214 // Handle ACKs if needed
215 if sngecomm
.AckMode() != "auto" {
216 sngecomm
.HandleAck(conn
, md
.Message
.Headers
, id
)
220 sngecomm
.HandleUnsubscribe(conn
, d
, id
)
221 ll
.Printf("%stag:%s connsess:%s unsubscribe_complete id:%v d:%v qnum:%v mc:%v\n",
222 exampid
, ltag
, conn
.Session(),
226 ll
.Printf("%stag:%s connsess:%s recv_end id:%v d:%v qnum:%v mc:%v\n",
227 exampid
, ltag
, conn
.Session(),
234 Start all sender go routines.
236 func startSenders(nqs
int) {
237 ltag
:= tag
+ "-startsenders"
239 ll
.Printf("%stag:%s connsess:%s queue_count nqs:%v\n",
240 exampid
, ltag
, conn
.Session(),
243 mc
:= senv
.Nmsgs() // message count
244 ll
.Printf("%stag:%s connsess:%s message_count mc:%v\n",
245 exampid
, ltag
, conn
.Session(),
247 for i
:= 1; i
<= nqs
; i
++ { // all queues
253 ll
.Printf("%stag:%s connsess:%s ends nqs:%v mc:%v\n",
254 exampid
, ltag
, conn
.Session(),
260 Start all receiver go routines.
262 func startReceivers(nqs
int) {
263 ltag
:= tag
+ "-startreceivers"
265 ll
.Printf("%stag:%s connsess:%s queue_count nqs:%v\n",
266 exampid
, ltag
, conn
.Session(),
269 mc
:= senv
.Nmsgs() // get message count
270 ll
.Printf("%stag:%s connsess:%s message_count mc:%v\n",
271 exampid
, ltag
, conn
.Session(),
274 for i
:= 1; i
<= nqs
; i
++ { // all queues
280 ll
.Printf("%stag:%s connsess:%s ends nqs:%v mc:%v\n",
281 exampid
, ltag
, conn
.Session(),
286 // Show a number of writers and readers operating concurrently from unique
292 sngecomm
.ShowRunParms(exampid
)
294 ll
.Printf("%stag:%s connsess:%s main_starts\n",
295 exampid
, tag
, sngecomm
.Lcs
)
297 ll
.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
298 exampid
, tag
, sngecomm
.Lcs
,
301 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
302 exampid
, tag
, sngecomm
.Lcs
,
303 runtime
.GOMAXPROCS(-1))
305 if sngecomm
.SetMAXPROCS() {
306 nc
:= runtime
.NumCPU()
307 ll
.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
308 exampid
, tag
, sngecomm
.Lcs
,
310 gmp
:= runtime
.GOMAXPROCS(nc
)
311 ll
.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
312 exampid
, tag
, sngecomm
.Lcs
,
314 ll
.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
315 exampid
, tag
, sngecomm
.Lcs
,
316 runtime
.GOMAXPROCS(-1))
319 sw
= sngecomm
.SendWait()
320 rw
= sngecomm
.RecvWait()
321 sf
= sngecomm
.SendFactor()
322 rf
= sngecomm
.RecvFactor()
323 ll
.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
324 exampid
, tag
, sngecomm
.Lcs
,
327 nqs
:= sngecomm
.Nqs()
329 // Standard example connect sequence
331 n
, conn
, e
= sngecomm
.CommonConnect(exampid
, tag
, ll
)
334 ll
.Printf("%stag:%s connsess:%s Connect Response headers:%v body%s\n",
335 exampid
, tag
, conn
.Session(), conn
.ConnectResponse
.Headers
,
336 string(conn
.ConnectResponse
.Body
))
338 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
339 exampid
, tag
, sngecomm
.Lcs
,
340 e
.Error()) // Handle this ......
343 // Many receivers running under the same connection can cause
344 // (wire read) performance issues. This is *very* dependent on the broker
345 // being used, specifically the broker's algorithm for putting messages on
347 // To alleviate those issues, this strategy insures that messages are
348 // received from the wire as soon as possible. Those messages are then
349 // buffered internally for (possibly later) application processing. In
350 // this example, buffering occurs in the stompngo package.
351 conn
.SetSubChanCap(senv
.SubChanCap()) // Experiment with this value, YMMV
355 go startReceivers(nqs
)
359 // Standard example disconnect sequence
360 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, tag
, ll
)
362 ll
.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
363 exampid
, tag
, conn
.Session(),
364 e
.Error()) // Handle this ......
367 sngecomm
.ShowStats(exampid
, tag
, conn
)
369 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
370 exampid
, tag
, conn
.Session(),
373 time
.Sleep(250 * time
.Millisecond
)