Revert profiling experiment.
[stompngo_examples.git] / srmgor_manyconn / srmgor_manyconn.go
blobc971b78bb04623732e9a857a7a7ec607572e84e5
1 //
2 // Copyright © 2012-2014 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. Each sender and receiver
24 operates under a unique network connection.
26 Examples:
28 # A few queues and a few messages:
29 STOMP_NQS=5 STOMP_NMSGS=10 go run srmgor_manyconn.go
32 package main
34 import (
35 "fmt"
36 "log"
37 "net"
38 "runtime"
39 "sync"
40 "time"
42 "github.com/gmallard/stompngo"
43 "github.com/gmallard/stompngo_examples/sngecomm"
46 var exampid = "srmgor_manyconn:"
48 var wgsend sync.WaitGroup
49 var wgrecv sync.WaitGroup
51 // We 'stagger' between each message send and message receive for a random
52 // amount of time.
53 // Vary these for experimental purposes. YMMV.
54 var max int64 = 1e9 // Max stagger time (nanoseconds)
55 var min int64 = max / 10 // Min stagger time (nanoseconds)
57 // Wait flags
58 var send_wait = true
59 var recv_wait = true
61 // Sleep multipliers
62 var sendFact float64 = 1.0
63 var recvFact float64 = 1.0
65 // Number of messages
66 var nmsgs = 1
68 func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) {
69 qns := fmt.Sprintf("%d", qnum) // queue number
70 qp := sngecomm.Dest() // queue name prefix
71 q := qp + "." + qns
72 fmt.Println(sngecomm.ExampIdNow(exampid), "send queue name:", q, qnum)
73 h := stompngo.Headers{"destination", q,
74 "qnum", qns} // send Headers
75 if sngecomm.Persistent() {
76 h = h.Add("persistent", "true")
78 fmt.Println(sngecomm.ExampIdNow(exampid), "send starts", nmsgs, qnum)
80 tmr := time.NewTimer(100 * time.Hour)
81 // Send messages
82 for n := 1; n <= nmsgs; n++ {
83 si := fmt.Sprintf("%d", n)
84 sh := append(h, "msgnum", si)
85 // Generate a message to send ...............
86 fmt.Println(sngecomm.ExampIdNow(exampid), "send message", qnum, si)
87 e := conn.Send(sh, string(sngecomm.Partial()))
88 if e != nil {
89 log.Fatalln(sngecomm.ExampIdNow(exampid), "send:", e, nc.LocalAddr().String(), qnum)
91 if n == nmsgs {
92 break
94 if send_wait {
95 runtime.Gosched() // yield for this example
96 d := time.Duration(sngecomm.ValueBetween(min, max, sendFact))
97 fmt.Println(sngecomm.ExampIdNow(exampid), "send", "stagger", int64(d)/1000000, "ms")
98 tmr.Reset(d)
99 _ = <-tmr.C
104 func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) {
105 qns := fmt.Sprintf("%d", qnum) // queue number
106 qp := sngecomm.Dest() // queue name prefix
107 q := qp + "." + qns
108 fmt.Println(sngecomm.ExampIdNow(exampid), "recv queue name:", q, qnum)
109 // Subscribe
110 id := stompngo.Uuid() // A unique subscription ID
111 r := sngecomm.Subscribe(conn, q, id, "auto")
112 // Receive messages
113 fmt.Println(sngecomm.ExampIdNow(exampid), "recv starts", nmsgs, qnum)
115 tmr := time.NewTimer(100 * time.Hour)
116 for n := 1; n <= nmsgs; n++ {
117 d := <-r
118 if d.Error != nil {
119 log.Fatalln(sngecomm.ExampIdNow(exampid), "recv read:", d.Error, nc.LocalAddr().String(), qnum)
122 // Sanity check the queue and message numbers
123 mns := fmt.Sprintf("%d", n) // message number
124 if !d.Message.Headers.ContainsKV("qnum", qns) || !d.Message.Headers.ContainsKV("msgnum", mns) {
125 log.Fatalln("Bad Headers", d.Message.Headers, qns, mns)
128 // Process the inbound message .................
129 sl := 16
130 if len(d.Message.Body) < sl {
131 sl = len(d.Message.Body)
133 fmt.Println(sngecomm.ExampIdNow(exampid), "recv message", string(d.Message.Body[0:sl]), qnum, d.Message.Headers.Value("msgnum"))
134 if n == nmsgs {
135 break
138 if recv_wait {
139 runtime.Gosched() // yield for this example
140 d := time.Duration(sngecomm.ValueBetween(min, max, recvFact))
141 fmt.Println(sngecomm.ExampIdNow(exampid), "recv", "stagger", int64(d)/1000000, "ms")
142 tmr.Reset(d)
143 _ = <-tmr.C
146 fmt.Println(sngecomm.ExampIdNow(exampid), "recv done:", q)
147 // Unsubscribe
148 sngecomm.Unsubscribe(conn, q, id)
152 func runReceiver(qnum int) {
153 fmt.Println(sngecomm.ExampIdNow(exampid), "recv start for queue number", qnum)
154 // Network Open
155 h, p := sngecomm.HostAndPort() // host and port
156 n, e := net.Dial("tcp", net.JoinHostPort(h, p))
157 if e != nil {
158 log.Fatalln(sngecomm.ExampIdNow(exampid), "recv nectonnr:", qnum, e) // Handle this ......
160 fmt.Println(sngecomm.ExampIdNow(exampid), "recv network open complete", qnum)
161 fmt.Println(sngecomm.ExampIdNow(exampid), "recv network local", n.LocalAddr().String(), qnum)
162 fmt.Println(sngecomm.ExampIdNow(exampid), "recv network remote", n.RemoteAddr().String(), qnum)
163 // Stomp connect
164 ch := sngecomm.ConnectHeaders()
165 log.Println(sngecomm.ExampIdNow(exampid), "recv", "vhost:", sngecomm.Vhost(), "protocol:", sngecomm.Protocol())
166 conn, e := stompngo.Connect(n, ch)
167 if e != nil {
168 log.Fatalln(sngecomm.ExampIdNow(exampid), "recv stompconnect:", qnum, e) // Handle this ......
170 fmt.Println(sngecomm.ExampIdNow(exampid), "recv connection complete:", qnum)
172 conn.SetSubChanCap(sngecomm.SubChanCap()) // Experiment with this value, YMMV
173 // Receives
174 receiveMessages(conn, qnum, n)
175 fmt.Println(sngecomm.ExampIdNow(exampid), "recv receives complete:", qnum)
176 // Disconnect from Stomp server
177 eh := stompngo.Headers{"recv_discqueue", fmt.Sprintf("%d", qnum)}
178 e = conn.Disconnect(eh)
179 if e != nil {
180 log.Fatalln(sngecomm.ExampIdNow(exampid), "recv disconnects:", qnum, e) // Handle this ......
182 fmt.Println(sngecomm.ExampIdNow(exampid), "recv disconnected:", qnum)
183 // Network close
184 e = n.Close()
185 if e != nil {
186 log.Fatalln(sngecomm.ExampIdNow(exampid), "recv netcloser", qnum, e) // Handle this ......
188 fmt.Println(sngecomm.ExampIdNow(exampid), "recv network close complete", qnum)
189 fmt.Println(sngecomm.ExampIdNow(exampid), "recv end for queue number", qnum)
190 sngecomm.ShowStats(exampid, "recv "+fmt.Sprintf("%d", qnum), conn)
191 wgrecv.Done()
194 func runSender(qnum int) {
195 fmt.Println(sngecomm.ExampIdNow(exampid), "send start for queue number", qnum)
196 // Network Open
197 h, p := sngecomm.HostAndPort() // host and port
198 n, e := net.Dial("tcp", net.JoinHostPort(h, p))
199 if e != nil {
200 log.Fatalln(sngecomm.ExampIdNow(exampid), "send nectonnr:", qnum, e) // Handle this ......
202 fmt.Println(sngecomm.ExampIdNow(exampid), "send network open complete", qnum)
203 fmt.Println(sngecomm.ExampIdNow(exampid), "send network local", n.LocalAddr().String(), qnum)
204 fmt.Println(sngecomm.ExampIdNow(exampid), "send network remote", n.RemoteAddr().String(), qnum)
205 // Stomp connect
206 ch := sngecomm.ConnectHeaders()
207 log.Println(sngecomm.ExampIdNow(exampid), "send", "vhost:", sngecomm.Vhost(), "protocol:", sngecomm.Protocol())
208 conn, e := stompngo.Connect(n, ch)
209 if e != nil {
210 log.Fatalln(sngecomm.ExampIdNow(exampid), "send stompconnect:", qnum, e) // Handle this ......
212 fmt.Println(sngecomm.ExampIdNow(exampid), "send connection complete:", qnum)
214 sendMessages(conn, qnum, n)
215 fmt.Println(sngecomm.ExampIdNow(exampid), "send sends complete:", qnum)
216 // Disconnect from Stomp server
217 eh := stompngo.Headers{"send_discqueue", fmt.Sprintf("%d", qnum)}
218 e = conn.Disconnect(eh)
219 if e != nil {
220 log.Fatalln(sngecomm.ExampIdNow(exampid), "send disconnects:", qnum, e) // Handle this ......
222 fmt.Println(sngecomm.ExampIdNow(exampid), "send disconnected:", qnum)
223 // Network close
224 e = n.Close()
225 if e != nil {
226 log.Fatalln(sngecomm.ExampIdNow(exampid), "send netcloser", qnum, e) // Handle this ......
228 fmt.Println(sngecomm.ExampIdNow(exampid), "send network close complete", qnum)
229 fmt.Println(sngecomm.ExampIdNow(exampid), "send end for queue number", qnum)
230 sngecomm.ShowStats(exampid, "send "+fmt.Sprintf("%d", qnum), conn)
231 wgsend.Done()
234 func main() {
235 sngecomm.ShowRunParms(exampid)
237 if sngecomm.Pprof() {
238 cfg := profile.Config{
239 MemProfile: true,
240 CPUProfile: true,
241 BlockProfile: true,
242 NoShutdownHook: false, // Hook SIGINT
244 defer profile.Start(&cfg).Stop()
247 tn := time.Now()
248 fmt.Println(sngecomm.ExampIdNow(exampid), "main starts")
249 if sngecomm.SetMAXPROCS() {
250 nc := runtime.NumCPU()
251 fmt.Println(sngecomm.ExampIdNow(exampid), "main number of CPUs is:", nc)
252 c := runtime.GOMAXPROCS(nc)
253 fmt.Println(sngecomm.ExampIdNow(exampid), "main previous number of GOMAXPROCS is:", c)
254 fmt.Println(sngecomm.ExampIdNow(exampid), "main current number of GOMAXPROCS is:", runtime.GOMAXPROCS(-1))
257 send_wait = sngecomm.SendWait()
258 recv_wait = sngecomm.RecvWait()
259 sendFact = sngecomm.SendFactor()
260 recvFact = sngecomm.RecvFactor()
261 fmt.Println(sngecomm.ExampIdNow(exampid), "main Sleep Factors", "send", sendFact, "recv", recvFact)
263 numq := sngecomm.Nqs()
264 nmsgs = sngecomm.Nmsgs() // message count
266 fmt.Println(sngecomm.ExampIdNow(exampid), "main starting receivers")
267 for q := 1; q <= numq; q++ {
268 wgrecv.Add(1)
269 go runReceiver(q)
271 fmt.Println(sngecomm.ExampIdNow(exampid), "main started receivers")
273 fmt.Println(sngecomm.ExampIdNow(exampid), "main starting senders")
274 for q := 1; q <= numq; q++ {
275 wgsend.Add(1)
276 go runSender(q)
278 fmt.Println(sngecomm.ExampIdNow(exampid), "main started senders")
280 wgsend.Wait()
281 fmt.Println(sngecomm.ExampIdNow(exampid), "main senders complete")
282 wgrecv.Wait()
283 fmt.Println(sngecomm.ExampIdNow(exampid), "main receivers complete")
285 fmt.Println(sngecomm.ExampIdNow(exampid), "main ends", time.Since(tn))