2 // Copyright © 2011-2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Send and receive many STOMP messages using multiple queues and goroutines
24 to service each send or receive instance. All senders and receivers share the
25 same STOMP connection.
38 "github.com/davecheney/profile"
40 "github.com/gmallard/stompngo"
41 // senv methods could be used in general by stompngo clients.
42 "github.com/gmallard/stompngo/senv"
43 // sngecomm methods are used specifically for these example clients.
44 "github.com/gmallard/stompngo_examples/sngecomm"
48 ll
= log
.New(os
.Stdout
, "ECNDS ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
50 exampid
= "srmgor_1conn:"
56 // We 'stagger' between each message send and message receive for a random
58 // Vary these for experimental purposes. YMMV.
59 max
int64 = 1e9
// Max stagger time (nanoseconds)
60 min
int64 = max
/ 10 // Min stagger time (nanoseconds)
71 n net
.Conn
// Network Connection
72 conn
*stompngo
.Connection
// Stomp Connection
77 // Send messages to a particular queue
78 func sender(qn
, mc
int) {
79 qns
:= fmt
.Sprintf("%d", qn
) // string queue number
80 id
:= stompngo
.Uuid() // A unique sender id
81 ll
.Printf("%s id:%s send_start_queue_number qn:%d\n", exampid
, id
, qn
)
83 d
:= senv
.Dest() + "." + qns
84 ll
.Printf("%s id:%s send_queue_name:%s qn:%d\n", exampid
, id
, d
, qn
)
85 wh
:= stompngo
.Headers
{"destination", d
, "senderId", id
,
86 "qnum", qns
} // send Headers
87 if senv
.Persistent() {
88 wh
= wh
.Add("persistent", "true")
91 tmr
:= time
.NewTimer(100 * time
.Hour
)
93 for i
:= 1; i
<= mc
; i
++ {
94 si
:= fmt
.Sprintf("%d", i
)
95 sh
:= append(wh
, "msgnum", si
)
96 // Generate a message to send ...............
97 ll
.Printf("%s id:%s send_message qn:%d msgnum:%s\n", exampid
, id
, qn
, si
)
98 e
:= conn
.Send(sh
, string(sngecomm
.Partial()))
100 ll
.Fatalln(exampid
, id
, "send error", e
, qn
)
103 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
104 ll
.Printf("%s send_stagger dt:%v qn:%d id:%s\n",
113 ll
.Printf("%s id:%s send_end_queue_number qn:%d\n", exampid
, id
, qn
)
117 // Receive messages from a particular queue
118 func receiver(qn
, mc
int) {
119 qns
:= fmt
.Sprintf("%d", qn
) // string queue number
120 pbc
:= sngecomm
.Pbc()
121 id
:= stompngo
.Uuid() // A unique subscription ID
123 d
:= senv
.Dest() + "." + qns
124 ll
.Printf("%s id:%s recv_queue_name:%s qn:%s\n", exampid
, id
, d
, qns
)
126 sc
:= sngecomm
.HandleSubscribe(conn
, d
, id
, sngecomm
.AckMode())
128 tmr
:= time
.NewTimer(100 * time
.Hour
)
129 var md stompngo
.MessageData
131 for i
:= 1; i
<= mc
; i
++ {
132 ll
.Printf("%s id:%s recv_ranchek qn:%d chlen:%d chcap:%d\n", exampid
, id
,
133 qn
, len(sc
), cap(sc
))
137 case md
= <-conn
.MessageData
:
138 // A RECEIPT or ERROR frame is unexpected here
139 ll
.Fatalln(exampid
, md
) // Handle this
142 ll
.Fatalln(exampid
, id
, "recv error", md
.Error
, qn
)
145 // Process the inbound message .................
146 ll
.Printf("%s id:%s recv_message qn:%d msgnum:%d\n", exampid
, id
, qn
, i
)
149 if len(md
.Message
.Body
) < maxlen
{
150 maxlen
= len(md
.Message
.Body
)
152 ss
:= string(md
.Message
.Body
[0:maxlen
])
153 ll
.Printf("%s Payload: %s\n", exampid
, ss
) // Data payload
156 // Sanity check the message Command, and the queue and message numbers
157 mns
:= fmt
.Sprintf("%d", i
) // message number
158 if md
.Message
.Command
!= stompngo
.MESSAGE
{
159 ll
.Fatalln(exampid
, "Bad Frame", md
, qn
, mns
)
161 if !md
.Message
.Headers
.ContainsKV("qnum", qns
) ||
!md
.Message
.Headers
.ContainsKV("msgnum", mns
) {
162 ll
.Fatalln(exampid
, "Bad Headers", md
.Message
.Headers
, qn
, mns
)
166 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, rf
))
167 ll
.Printf("%s recv_stagger dt:%v qn:%d id:%s\n",
175 // Handle ACKs if needed
176 if sngecomm
.AckMode() != "auto" {
177 sngecomm
.HandleAck(conn
, md
.Message
.Headers
, id
)
181 sngecomm
.HandleUnsubscribe(conn
, d
, id
)
184 ll
.Printf("%s id:%s recv_end_queue_number qn:%d\n", exampid
, id
, qn
)
189 Start all sender go routines.
191 func startSenders(nqs
int) {
192 ll
.Printf("%s startSenders_starts nqs:%d\n", exampid
, nqs
)
194 mc
:= senv
.Nmsgs() // message count
196 ll
.Printf("%s startSenders_message_count mc:%d nqs:%d\n", exampid
, mc
, nqs
)
197 for i
:= 1; i
<= nqs
; i
++ { // all queues
203 ll
.Printf("%s startSenders_endsexampid nqs:%d\n", exampid
, nqs
)
208 Start all receiver go routines.
210 func startReceivers(nqs
int) {
211 ll
.Printf("%s startReceivers_starts nqs:%d\n", exampid
, "startReceivers starts", nqs
)
213 mc
:= senv
.Nmsgs() // get message count
214 ll
.Printf("%s startReceivers_message_count mc:%d nqs:%d\n", exampid
, mc
, nqs
)
215 for i
:= 1; i
<= nqs
; i
++ { // all queues
221 ll
.Printf("%s startReceivers_ends nqs:%d\n", exampid
, nqs
)
225 // Show a number of writers and readers operating concurrently from unique
228 sngecomm
.ShowRunParms(exampid
)
230 if sngecomm
.Pprof() {
231 cfg
:= profile
.Config
{
235 NoShutdownHook
: false, // Hook SIGINT
237 defer profile
.Start(&cfg
).Stop()
241 ll
.Println(exampid
, "main starts")
242 ll
.Println(exampid
, "main profiling", sngecomm
.Pprof())
243 ll
.Println(exampid
, "main current number of GOMAXPROCS is:", runtime
.GOMAXPROCS(-1))
244 if sngecomm
.SetMAXPROCS() {
245 nc
:= runtime
.NumCPU()
246 ll
.Println(exampid
, "main number of CPUs is:", nc
)
247 gmp
:= runtime
.GOMAXPROCS(nc
)
248 ll
.Println(exampid
, "main previous number of GOMAXPROCS is:", gmp
)
249 ll
.Println(exampid
, "main current number of GOMAXPROCS is:", runtime
.GOMAXPROCS(-1))
252 sw
= sngecomm
.SendWait()
253 rw
= sngecomm
.RecvWait()
254 sf
= sngecomm
.SendFactor()
255 rf
= sngecomm
.RecvFactor()
256 ll
.Println(exampid
, "main Sleep Factors", "send", sf
, "recv", rf
)
258 nqs
:= sngecomm
.Nqs()
259 // Open net and stomp connections
260 h
, p
:= senv
.HostAndPort() // network connection host and port
263 n
, e
= net
.Dial("tcp", net
.JoinHostPort(h
, p
))
265 ll
.Fatalln(exampid
, "main dial error", e
) // Handle this ......
267 // Stomp connect, 1.1(+)
268 ch
:= sngecomm
.ConnectHeaders()
269 ll
.Println(exampid
, "vhost:", senv
.Vhost(), "protocol:", senv
.Protocol())
270 conn
, e
= stompngo
.Connect(n
, ch
)
272 ll
.Fatalln(exampid
, "main connect error", e
) // Handle this ......
275 // Many receivers running under the same connection can cause
276 // (wire read) performance issues. This is *very* dependent on the broker
277 // being used, specifically the broker's algorithm for putting messages on
279 // To alleviate those issues, this strategy insures that messages are
280 // received from the wire as soon as possible. Those messages are then
281 // buffered internally for (possibly later) application processing. In
282 // this example, buffering occurs in the stompngo package.
283 conn
.SetSubChanCap(senv
.SubChanCap()) // Experiment with this value, YMMV
287 go startReceivers(nqs
)
291 // Disconnect from Stomp server
292 e
= conn
.Disconnect(stompngo
.Headers
{})
294 ll
.Fatalln(exampid
, "main disconnect error", e
) // Handle this ......
299 ll
.Fatalln(exampid
, "main netclose error", e
) // Handle this ......
301 sngecomm
.ShowStats(exampid
, "done", conn
)
302 dur
:= time
.Since(start
)
303 ll
.Println(exampid
, "main ends", dur
)