2 // Copyright © 2012-2014 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. Each sender and receiver
24 operates under a unique network connection.
28 # A few queues and a few messages:
29 STOMP_NQS=5 STOMP_NMSGS=10 go run srmgor_manyconn.go
42 "github.com/gmallard/stompngo"
43 "github.com/gmallard/stompngo_examples/sngecomm"
46 var exampid
= "srmgor_manyconn:"
48 var wgsend sync
.WaitGroup
49 var wgrecv sync
.WaitGroup
51 // We 'stagger' between each message send and message receive for a random
53 // Vary these for experimental purposes. YMMV.
54 var max
int64 = 1e9
// Max stagger time (nanoseconds)
55 var min
int64 = max
/ 10 // Min stagger time (nanoseconds)
62 var sendFact
float64 = 1.0
63 var recvFact
float64 = 1.0
68 func sendMessages(conn
*stompngo
.Connection
, qnum
int, nc net
.Conn
) {
69 qns
:= fmt
.Sprintf("%d", qnum
) // queue number
70 qp
:= sngecomm
.Dest() // queue name prefix
72 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send queue name:", q
, qnum
)
73 h
:= stompngo
.Headers
{"destination", q
,
74 "qnum", qns
} // send Headers
75 if sngecomm
.Persistent() {
76 h
= h
.Add("persistent", "true")
78 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send starts", nmsgs
, qnum
)
80 tmr
:= time
.NewTimer(100 * time
.Hour
)
82 for n
:= 1; n
<= nmsgs
; n
++ {
83 si
:= fmt
.Sprintf("%d", n
)
84 sh
:= append(h
, "msgnum", si
)
85 // Generate a message to send ...............
86 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send message", qnum
, si
)
87 e
:= conn
.Send(sh
, string(sngecomm
.Partial()))
89 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "send:", e
, nc
.LocalAddr().String(), qnum
)
95 runtime
.Gosched() // yield for this example
96 d
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sendFact
))
97 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send", "stagger", int64(d
)/1000000, "ms")
104 func receiveMessages(conn
*stompngo
.Connection
, qnum
int, nc net
.Conn
) {
105 qns
:= fmt
.Sprintf("%d", qnum
) // queue number
106 qp
:= sngecomm
.Dest() // queue name prefix
108 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv queue name:", q
, qnum
)
110 id
:= stompngo
.Uuid() // A unique subscription ID
111 r
:= sngecomm
.Subscribe(conn
, q
, id
, "auto")
113 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv starts", nmsgs
, qnum
)
115 tmr
:= time
.NewTimer(100 * time
.Hour
)
116 for n
:= 1; n
<= nmsgs
; n
++ {
119 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "recv read:", d
.Error
, nc
.LocalAddr().String(), qnum
)
122 // Sanity check the queue and message numbers
123 mns
:= fmt
.Sprintf("%d", n
) // message number
124 if !d
.Message
.Headers
.ContainsKV("qnum", qns
) ||
!d
.Message
.Headers
.ContainsKV("msgnum", mns
) {
125 log
.Fatalln("Bad Headers", d
.Message
.Headers
, qns
, mns
)
128 // Process the inbound message .................
130 if len(d
.Message
.Body
) < sl
{
131 sl
= len(d
.Message
.Body
)
133 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv message", string(d
.Message
.Body
[0:sl
]), qnum
, d
.Message
.Headers
.Value("msgnum"))
139 runtime
.Gosched() // yield for this example
140 d
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, recvFact
))
141 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv", "stagger", int64(d
)/1000000, "ms")
146 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv done:", q
)
148 sngecomm
.Unsubscribe(conn
, q
, id
)
152 func runReceiver(qnum
int) {
153 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv start for queue number", qnum
)
155 h
, p
:= sngecomm
.HostAndPort() // host and port
156 n
, e
:= net
.Dial("tcp", net
.JoinHostPort(h
, p
))
158 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "recv nectonnr:", qnum
, e
) // Handle this ......
160 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv network open complete", qnum
)
161 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv network local", n
.LocalAddr().String(), qnum
)
162 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv network remote", n
.RemoteAddr().String(), qnum
)
164 ch
:= sngecomm
.ConnectHeaders()
165 log
.Println(sngecomm
.ExampIdNow(exampid
), "recv", "vhost:", sngecomm
.Vhost(), "protocol:", sngecomm
.Protocol())
166 conn
, e
:= stompngo
.Connect(n
, ch
)
168 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "recv stompconnect:", qnum
, e
) // Handle this ......
170 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv connection complete:", qnum
)
172 conn
.SetSubChanCap(sngecomm
.SubChanCap()) // Experiment with this value, YMMV
174 receiveMessages(conn
, qnum
, n
)
175 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv receives complete:", qnum
)
176 // Disconnect from Stomp server
177 eh
:= stompngo
.Headers
{"recv_discqueue", fmt
.Sprintf("%d", qnum
)}
178 e
= conn
.Disconnect(eh
)
180 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "recv disconnects:", qnum
, e
) // Handle this ......
182 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv disconnected:", qnum
)
186 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "recv netcloser", qnum
, e
) // Handle this ......
188 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv network close complete", qnum
)
189 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "recv end for queue number", qnum
)
190 sngecomm
.ShowStats(exampid
, "recv "+fmt
.Sprintf("%d", qnum
), conn
)
194 func runSender(qnum
int) {
195 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send start for queue number", qnum
)
197 h
, p
:= sngecomm
.HostAndPort() // host and port
198 n
, e
:= net
.Dial("tcp", net
.JoinHostPort(h
, p
))
200 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "send nectonnr:", qnum
, e
) // Handle this ......
202 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send network open complete", qnum
)
203 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send network local", n
.LocalAddr().String(), qnum
)
204 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send network remote", n
.RemoteAddr().String(), qnum
)
206 ch
:= sngecomm
.ConnectHeaders()
207 log
.Println(sngecomm
.ExampIdNow(exampid
), "send", "vhost:", sngecomm
.Vhost(), "protocol:", sngecomm
.Protocol())
208 conn
, e
:= stompngo
.Connect(n
, ch
)
210 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "send stompconnect:", qnum
, e
) // Handle this ......
212 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send connection complete:", qnum
)
214 sendMessages(conn
, qnum
, n
)
215 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send sends complete:", qnum
)
216 // Disconnect from Stomp server
217 eh
:= stompngo
.Headers
{"send_discqueue", fmt
.Sprintf("%d", qnum
)}
218 e
= conn
.Disconnect(eh
)
220 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "send disconnects:", qnum
, e
) // Handle this ......
222 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send disconnected:", qnum
)
226 log
.Fatalln(sngecomm
.ExampIdNow(exampid
), "send netcloser", qnum
, e
) // Handle this ......
228 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send network close complete", qnum
)
229 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "send end for queue number", qnum
)
230 sngecomm
.ShowStats(exampid
, "send "+fmt
.Sprintf("%d", qnum
), conn
)
235 sngecomm
.ShowRunParms(exampid
)
237 if sngecomm
.Pprof() {
238 cfg
:= profile
.Config
{
242 NoShutdownHook
: false, // Hook SIGINT
244 defer profile
.Start(&cfg
).Stop()
248 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main starts")
249 if sngecomm
.SetMAXPROCS() {
250 nc
:= runtime
.NumCPU()
251 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main number of CPUs is:", nc
)
252 c
:= runtime
.GOMAXPROCS(nc
)
253 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main previous number of GOMAXPROCS is:", c
)
254 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main current number of GOMAXPROCS is:", runtime
.GOMAXPROCS(-1))
257 send_wait
= sngecomm
.SendWait()
258 recv_wait
= sngecomm
.RecvWait()
259 sendFact
= sngecomm
.SendFactor()
260 recvFact
= sngecomm
.RecvFactor()
261 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main Sleep Factors", "send", sendFact
, "recv", recvFact
)
263 numq
:= sngecomm
.Nqs()
264 nmsgs
= sngecomm
.Nmsgs() // message count
266 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main starting receivers")
267 for q
:= 1; q
<= numq
; q
++ {
271 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main started receivers")
273 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main starting senders")
274 for q
:= 1; q
<= numq
; q
++ {
278 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main started senders")
281 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main senders complete")
283 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main receivers complete")
285 fmt
.Println(sngecomm
.ExampIdNow(exampid
), "main ends", time
.Since(tn
))