2 // Copyright © 2011-2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. All senders share a single
24 STOMP connection, as do all receivers.
39 "github.com/davecheney/profile"
41 "github.com/gmallard/stompngo"
42 // senv methods could be used in general by stompngo clients.
43 "github.com/gmallard/stompngo/senv"
44 // sngecomm methods are used specifically for these example clients.
45 "github.com/gmallard/stompngo_examples/sngecomm"
49 exampid
= "srmgor_2conn:"
55 // We 'stagger' between each message send and message receive for a random
57 // Vary these for experimental purposes. YMMV.
58 max
int64 = 1e9
// Max stagger time (nanoseconds)
59 min
int64 = max
/ 10 // Min stagger time (nanoseconds)
69 // Possible profile file
70 cpuprofile
= flag
.String("cpuprofile", "", "write cpu profile to file")
72 ll
= log
.New(os
.Stdout
, "E1S1R ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
75 // Send messages to a particular queue
76 func sender(conn
*stompngo
.Connection
, qn
, nmsgs
int) {
77 qns
:= fmt
.Sprintf("%d", qn
) // queue number
78 ll
.Printf("%s connsess:%s sender_starts qn:%d nmsgs:%d\n",
79 exampid
, conn
.Session(), qn
, nmsgs
)
81 // qp := senv.Dest() // queue name prefix
82 d
:= senv
.Dest() + "." + qns
83 ll
.Printf("%s connsess:%s sender_starts d:%s\n",
84 exampid
, conn
.Session(), d
)
85 wh
:= stompngo
.Headers
{"destination", d
,
86 "qnum", qns
} // send Headers
87 if senv
.Persistent() {
88 wh
= wh
.Add("persistent", "true")
91 tmr
:= time
.NewTimer(100 * time
.Hour
)
93 for i
:= 1; i
<= nmsgs
; i
++ {
94 si
:= fmt
.Sprintf("%d", i
)
95 sh
:= append(wh
, "msgnum", si
)
96 // Generate a message to send ...............
97 ll
.Printf("%s connsess:%s sender_message qns:%s si:%s\n",
98 exampid
, conn
.Session(), qns
, si
)
99 e
:= conn
.Send(sh
, string(sngecomm
.Partial()))
102 ll
.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid
, "send error", e
, qn
)
106 runtime
.Gosched() // yield for this example
107 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
108 ll
.Printf("%s connsess:%s send_stagger dt:%v qns:%s\n",
109 exampid
, conn
.Session(),
116 ll
.Printf("%s connsess:%s sender_ends qn:%d nmsgs:%d\n",
117 exampid
, conn
.Session(), qn
, nmsgs
)
121 // Asynchronously process all messages for a given subscription.
122 func receiveWorker(sc
<-chan stompngo
.MessageData
, qns
string, nmsgs
int,
123 qc
chan<- bool, conn
*stompngo
.Connection
, id
string) {
125 tmr
:= time
.NewTimer(100 * time
.Hour
)
127 pbc
:= sngecomm
.Pbc() // Print byte count
130 var md stompngo
.MessageData
131 for i
:= 1; i
<= nmsgs
; i
++ {
135 case md
= <-conn
.MessageData
:
136 // Frames RECEIPT or ERROR not expected here
137 ll
.Fatalf("%s v1:%v\n", exampid
, md
) // Handle this
140 ll
.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid
, "recv read error", md
.Error
, qns
)
143 // Sanity check the queue and message numbers
144 mns
:= fmt
.Sprintf("%d", i
) // message number
145 if !md
.Message
.Headers
.ContainsKV("qnum", qns
) ||
!md
.Message
.Headers
.ContainsKV("msgnum", mns
) {
146 ll
.Fatalf("%s v1:%v v2:%v v3:%v v4:%v\n", exampid
, "Bad Headers", md
.Message
.Headers
, qns
, mns
)
149 // Process the inbound message .................
150 sl
:= len(md
.Message
.Body
)
153 if len(md
.Message
.Body
) < sl
{
154 sl
= len(md
.Message
.Body
)
158 // Handle ACKs if needed
159 if sngecomm
.AckMode() != "auto" {
161 sngecomm
.HandleAck(conn
, ah
, id
)
163 ll
.Printf("%s connsess:%s recv_message body:%s qns:%s msgnum:%s\n",
164 exampid
, conn
.Session(),
165 string(md
.Message
.Body
[0:sl
]),
167 md
.Message
.Headers
.Value("msgnum"))
172 runtime
.Gosched() // yield for this example
173 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, rf
))
174 ll
.Printf("%s connsess:%s recv_stagger dt:%v qns:%s\n",
175 exampid
, conn
.Session(),
185 // Receive messages from a particular queue
186 func receiver(conn
*stompngo
.Connection
, qn
, nmsgs
int) {
187 qns
:= fmt
.Sprintf("%d", qn
) // queue number
188 ll
.Printf("%s connsess:%s recveiver_starts qns:%d nmsgs:%d\n",
189 exampid
, conn
.Session(), qn
, nmsgs
)
191 qp
:= senv
.Dest() // queue name prefix
193 ll
.Printf("%s connsess:%s recveiver_names q:%s qn:%d\n",
194 exampid
, conn
.Session(), q
, qn
)
195 id
:= stompngo
.Uuid() // A unique subscription ID
196 sc
:= sngecomm
.HandleSubscribe(conn
, q
, id
, sngecomm
.AckMode())
197 // Many receivers running under the same connection can cause
198 // (wire read) performance issues. This is *very* dependent on the broker
199 // being used, specifically the broker's algorithm for putting messages on
201 // To alleviate those issues, this strategy insures that messages are
202 // received from the wire as soon as possible. Those messages are then
203 // buffered internally for (possibly later) application processing.
206 if s
:= os
.Getenv("STOMP_CONN2BUFFER"); s
!= "" {
207 i
, e
:= strconv
.ParseInt(s
, 10, 32)
209 ll
.Fatalf("%s v1:%v v2:%v\n", exampid
, "CONN2BUFFER conversion error", e
)
217 ll
.Printf("%s connsess:%s recveiver_mdbuffersize bs:%d qn:%d\n",
218 exampid
, conn
.Session(), bs
, qn
)
220 // Process all inputs async .......
221 // var mc chan stompngo.MessageData
222 mdc
:= make(chan stompngo
.MessageData
, bs
) // MessageData Buffer size
223 dc
:= make(chan bool) // Receive processing done channel
224 go receiveWorker(mdc
, qns
, nmsgs
, dc
, conn
, id
) // Start async processor
225 for i
:= 1; i
<= nmsgs
; i
++ {
226 mdc
<- <-sc
// Receive message data as soon as possible, and internally queue it
228 ll
.Printf("%s connsess:%s recveiver_waitforWorkersBegin qns:%s\n",
229 exampid
, conn
.Session(), qns
)
230 <-dc
// Wait until receive processing is done for this queue
231 ll
.Printf("%s connsess:%s recveiver_waitforWorkersEnd qns:%s\n",
232 exampid
, conn
.Session(), qns
)
235 sngecomm
.HandleUnsubscribe(conn
, q
, id
)
238 ll
.Printf("%s connsess:%s recveiver_ends qns:%s\n",
239 exampid
, conn
.Session(), qns
)
243 func startSenders(qn
int) {
244 ll
.Printf("%s startSenders_starts qn:%d\n",
248 h
, p
:= senv
.HostAndPort() // host and port
249 hap
:= net
.JoinHostPort(h
, p
)
250 n
, e
:= net
.Dial("tcp", hap
)
252 ll
.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid
, "startSenders netconnect error", e
, qn
) // Handle this ......
256 ch
:= sngecomm
.ConnectHeaders()
257 ll
.Printf("%s startSenders_sdata vhost:%s protocol:%s qn:%d\n",
258 exampid
, senv
.Vhost(), senv
.Protocol(), qn
)
259 conn
, e
:= stompngo
.Connect(n
, ch
)
261 ll
.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid
, "startSenders stompconnect error", e
, qn
) // Handle this ......
263 ll
.Printf("%s connsess:%s startSenders_connection qn:%d\n",
264 exampid
, conn
.Session(), qn
)
265 nmsgs
:= senv
.Nmsgs() // message count
266 ll
.Printf("%s connsess:%s startSenders_message_count nmsgs:%d qn:%d\n",
267 exampid
, conn
.Session(), nmsgs
, qn
)
268 for i
:= 1; i
<= qn
; i
++ { // all queues
270 go sender(conn
, i
, nmsgs
)
274 // Disconnect from Stomp server
275 e
= conn
.Disconnect(stompngo
.Headers
{})
277 ll
.Printf("%s v1:%v v2:%v v3:%v\n", exampid
, "startSenders disconnect error", e
, qn
) // Handle this ......
282 ll
.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid
, "startSenders netclose error", e
, qn
) // Handle this ......
285 ll
.Printf("%s startSenders_ends qn:%d\n",
287 sngecomm
.ShowStats(exampid
, "startSenders", conn
)
291 func startReceivers(qn
int) {
292 ll
.Printf("%s startReceivers_starts qn:%d\n",
296 h
, p
:= senv
.HostAndPort() // host and port
297 n
, e
:= net
.Dial("tcp", net
.JoinHostPort(h
, p
))
299 ll
.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid
, "startReceivers nectonnr:", e
, qn
) // Handle this ......
301 ch
:= sngecomm
.ConnectHeaders()
302 ll
.Printf("%s startReceivers_sdata vhost:%s protocol:%s qn:%dn",
303 exampid
, senv
.Vhost(), senv
.Protocol(), qn
)
304 conn
, e
:= stompngo
.Connect(n
, ch
)
306 ll
.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid
, "startReceivers stompconnectr:", e
, qn
) // Handle this ......
308 ll
.Printf("%s connsess:%s startReceivers_conndata qn:%d\n",
309 exampid
, conn
.Session(), qn
)
310 nmsgs
:= senv
.Nmsgs() // get message count
311 ll
.Printf("%s connsess:%s startReceivers_message_count nmsgs:%d qn:%d\n",
312 exampid
, conn
.Session(), nmsgs
, qn
)
313 for i
:= 1; i
<= qn
; i
++ { // all queues
315 go receiver(conn
, i
, nmsgs
)
319 // Disconnect from Stomp server
320 e
= conn
.Disconnect(stompngo
.Headers
{})
322 ll
.Printf("%s v1:%v v2:%v v3:%v\n", exampid
, "startReceivers disconnect error", e
, qn
) // Handle this ......
327 ll
.Printf("%s v1:%v v2:%v v3:%v\n", exampid
, "startReceivers netclose error", e
, qn
) // Handle this ......
330 ll
.Printf("%s startReceivers_ends qn:%d\n",
332 sngecomm
.ShowStats(exampid
, "startReceivers", conn
)
336 // Show a number of writers and readers operating concurrently from unique
339 sngecomm
.ShowRunParms(exampid
)
341 if sngecomm
.Pprof() {
342 cfg
:= profile
.Config
{
346 NoShutdownHook
: false, // Hook SIGINT
348 defer profile
.Start(&cfg
).Stop()
352 ll
.Printf("%s v1:%v\n", exampid
, "main starts")
354 if sngecomm
.SetMAXPROCS() {
355 nc
:= runtime
.NumCPU()
356 ll
.Printf("%s v1:%v v2:%v\n", exampid
, "main number of CPUs is:", nc
)
357 c
:= runtime
.GOMAXPROCS(nc
)
358 ll
.Printf("%s v1:%v v2:%v\n", exampid
, "main previous number of GOMAXPROCS is:", c
)
359 ll
.Printf("%s v1:%v v2:%v\n", exampid
, "main current number of GOMAXPROCS is:", runtime
.GOMAXPROCS(-1))
362 sw
= sngecomm
.SendWait()
363 rw
= sngecomm
.RecvWait()
364 sf
= sngecomm
.SendFactor()
365 rf
= sngecomm
.RecvFactor()
366 ll
.Printf("%s v1:%v v2:%v v3:%v v4:%v v5:%v\n", exampid
, "main Sleep Factors", "send", sf
, "recv", rf
)
375 ll
.Printf("%s v1:%v v2:%v\n", exampid
, "main ends", time
.Since(tn
))