Merge branch 'refactor_enhance' into dev
[stompngo_examples.git] / srmgor_manyconn / srmgor_manyconn.go
blobf2d0e426a89b9ac30a2a839ccc6945f5c08a1e8c
1 //
2 // Copyright © 2012-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. Each sender and receiver
24 operates under a unique network connection.
26 Examples:
28 # A few queues and a few messages:
29 STOMP_NQS=5 STOMP_NMSGS=10 go run srmgor_manyconn.go
32 package main
34 import (
35 "fmt"
36 "log"
37 "net"
38 "os"
39 "runtime"
40 "sync"
41 "time"
43 "github.com/davecheney/profile"
45 "github.com/gmallard/stompngo"
46 // senv methods could be used in general by stompngo clients.
47 "github.com/gmallard/stompngo/senv"
48 // sngecomm methods are used specifically for these example clients.
49 "github.com/gmallard/stompngo_examples/sngecomm"
52 var (
53 exampid = "srmgor_manyconn: "
55 wgs sync.WaitGroup
56 wgr sync.WaitGroup
58 // We 'stagger' between each message send and message receive for a random
59 // amount of time.
60 // Vary these for experimental purposes. YMMV.
61 max int64 = 1e9 // Max stagger time (nanoseconds)
62 min int64 = max / 10 // Min stagger time (nanoseconds)
64 // Wait flags
65 sw = true
66 rw = true
68 // Sleep multipliers
69 sf float64 = 1.0
70 rf float64 = 1.0
72 // Number of messages
73 nmsgs = senv.Nmsgs()
75 ll = log.New(os.Stdout, "EMSMR ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
77 tag = "manyconn"
80 func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) {
81 ltag := tag + "-sendmessages"
83 qns := fmt.Sprintf("%d", qnum) // queue number
84 d := senv.Dest() + "." + qns
85 ll.Printf("%stag:%s connsess:%s start d:%s qnum:%d\n",
86 exampid, ltag, conn.Session(),
87 d, qnum)
88 wh := stompngo.Headers{"destination", d,
89 "qnum", qns} // send Headers
90 if senv.Persistent() {
91 wh = wh.Add("persistent", "true")
94 tmr := time.NewTimer(100 * time.Hour)
95 // Send messages
96 for mc := 1; mc <= nmsgs; mc++ {
97 mcs := fmt.Sprintf("%d", mc)
98 sh := append(wh, "msgnum", mcs)
99 // Generate a message to send ...............
101 ll.Printf("%stag:%s connsess:%s message mc:%d qnum:%d\n",
102 exampid, ltag, conn.Session(),
103 mc, qnum)
104 e := conn.Send(sh, string(sngecomm.Partial()))
105 if e != nil {
106 ll.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
107 exampid, ltag, conn.Session(),
108 qnum, e.Error()) // Handle this ......
110 if mc == nmsgs {
111 break
113 if sw {
114 runtime.Gosched() // yield for this example
115 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
116 ll.Printf("%stag:%s connsess:%s send_stagger dt:%v qnum:%s mc:%d\n",
117 exampid, ltag, conn.Session(),
118 dt, qnum, mc)
119 tmr.Reset(dt)
120 _ = <-tmr.C
125 func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) {
126 ltag := tag + "-receivemessages"
128 qns := fmt.Sprintf("%d", qnum) // queue number
129 d := senv.Dest() + "." + qns
130 id := stompngo.Uuid() // A unique subscription ID
132 ll.Printf("%stag:%s connsess:%s receiveMessages_start id:%s d:%s qnum:%d nmsgs:%d\n",
133 exampid, ltag, conn.Session(),
134 id, d, qnum, nmsgs)
135 // Subscribe
136 sc := sngecomm.HandleSubscribe(conn, d, id, sngecomm.AckMode())
138 pbc := sngecomm.Pbc() // Print byte count
141 tmr := time.NewTimer(100 * time.Hour)
142 var md stompngo.MessageData
143 for mc := 1; mc <= nmsgs; mc++ {
145 select {
146 case md = <-sc:
147 case md = <-conn.MessageData:
148 // Frames RECEIPT or ERROR not expected here
149 ll.Fatalf("%stag:%s connsess:%s send_error qns:%v md:%v",
150 exampid, ltag, conn.Session(),
151 qns, md) // Handle this ......
153 if md.Error != nil {
154 ll.Fatalf("%stag:%s connsess:%s receive_error qns:%v error:%v\n",
155 exampid, ltag, conn.Session(),
156 qns, md.Error)
159 if md.Message.Command != stompngo.MESSAGE {
160 ll.Fatalf("%stag:%s connsess:%s bad_frame qns:%s mc:%d md:%v\n",
161 exampid, ltag, conn.Session(),
162 qns, mc, md)
165 mcs := fmt.Sprintf("%d", mc) // message number
166 if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mcs) {
167 ll.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v md:%v",
168 exampid, tag, conn.Session(),
169 qns, mcs, md) // Handle this ......
172 // Process the inbound message .................
173 sl := len(md.Message.Body)
174 if pbc > 0 {
175 sl = pbc
176 if len(md.Message.Body) < sl {
177 sl = len(md.Message.Body)
180 ll.Printf("%stag:%s connsess:%s receiveMessages_msg d:%s body:%s qnum:%d msgnum:%s\n",
181 exampid, ltag, conn.Session(),
182 d, string(md.Message.Body[0:sl]), qnum,
183 md.Message.Headers.Value("msgnum"))
184 if mc == nmsgs {
185 break
187 // Handle ACKs if needed
188 if sngecomm.AckMode() != "auto" {
189 ah := []string{}
190 sngecomm.HandleAck(conn, ah, id)
192 if mc == nmsgs {
193 break
196 if rw {
197 runtime.Gosched() // yield for this example
198 dt := time.Duration(sngecomm.ValueBetween(min, max, rf))
199 ll.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s mc:%d\n",
200 exampid, ltag, conn.Session(),
201 dt, qns, mc)
202 tmr.Reset(dt)
203 _ = <-tmr.C
206 ll.Printf("%stag:%s connsess:%s end d:%s qnum:%d nmsgs:%d\n",
207 exampid, ltag, conn.Session(),
208 d, qnum, nmsgs)
210 // Unsubscribe
211 sngecomm.HandleUnsubscribe(conn, d, id)
215 func runReceiver(qnum int) {
216 ltag := tag + "-runreceiver"
218 ll.Printf("%stag:%s connsess:%s start qnum:%d\n",
219 exampid, ltag, sngecomm.Lcs,
220 qnum)
222 // Standard example connect sequence
223 n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll)
224 if e != nil {
225 ll.Fatalf("%stag:%s connsess:%s error:%s\n",
226 exampid, ltag, sngecomm.Lcs,
227 e.Error()) // Handle this ......
231 conn.SetSubChanCap(senv.SubChanCap()) // Experiment with this value, YMMV
232 // Receives
233 receiveMessages(conn, qnum, n)
235 ll.Printf("%stag:%s connsess:%s receives_complete qnum:%d\n",
236 exampid, ltag, conn.Session(),
237 qnum)
239 // Standard example disconnect sequence
240 e = sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll)
241 if e != nil {
242 ll.Fatalf("%stag:%s connsess:%s error:%s\n",
243 exampid, ltag, conn.Session(),
244 e.Error()) // Handle this ......
247 sngecomm.ShowStats(exampid, "recv_"+fmt.Sprintf("%d", qnum), conn)
248 wgr.Done()
251 func runSender(qnum int) {
253 ltag := tag + "-runsender"
255 ll.Printf("%stag:%s connsess:%s start qnum:%d\n",
256 exampid, ltag, sngecomm.Lcs,
257 qnum)
258 // Standard example connect sequence
259 n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll)
260 if e != nil {
261 ll.Fatalf("%stag:%s connsess:%s error:%s\n",
262 exampid, ltag, sngecomm.Lcs,
263 e.Error()) // Handle this ......
267 sendMessages(conn, qnum, n)
269 ll.Printf("%stag:%s connsess:%s sends_complete qnum:%d\n",
270 exampid, ltag, conn.Session(),
271 qnum)
273 // Standard example disconnect sequence
274 e = sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll)
275 if e != nil {
276 ll.Fatalf("%stag:%s connsess:%s error:%s\n",
277 exampid, ltag, conn.Session(),
278 e.Error()) // Handle this ......
280 sngecomm.ShowStats(exampid, "send_"+fmt.Sprintf("%d", qnum), conn)
281 wgs.Done()
284 func main() {
286 st := time.Now()
288 sngecomm.ShowRunParms(exampid)
290 if sngecomm.Pprof() {
291 cfg := profile.Config{
292 MemProfile: true,
293 CPUProfile: true,
294 BlockProfile: true,
295 NoShutdownHook: false, // Hook SIGINT
297 defer profile.Start(&cfg).Stop()
300 ll.Printf("%stag:%s connsess:%s main_starts\n",
301 exampid, tag, sngecomm.Lcs)
303 ll.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
304 exampid, tag, sngecomm.Lcs,
305 sngecomm.Pprof())
307 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
308 exampid, tag, sngecomm.Lcs,
309 runtime.GOMAXPROCS(-1))
311 if sngecomm.SetMAXPROCS() {
312 nc := runtime.NumCPU()
313 ll.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
314 exampid, tag, sngecomm.Lcs,
316 gmp := runtime.GOMAXPROCS(nc)
317 ll.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
318 exampid, tag, sngecomm.Lcs,
319 gmp)
320 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
321 exampid, tag, sngecomm.Lcs,
322 runtime.GOMAXPROCS(-1))
325 sw = sngecomm.SendWait()
326 rw = sngecomm.RecvWait()
327 sf = sngecomm.SendFactor()
328 rf = sngecomm.RecvFactor()
329 ll.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
330 exampid, tag, sngecomm.Lcs,
331 sw, rw, sf, rf)
333 numq := sngecomm.Nqs()
334 nmsgs = senv.Nmsgs() // message count
336 ll.Printf("%stag:%s connsess:%s main_starting_receivers\n",
337 exampid, tag, sngecomm.Lcs)
338 for q := 1; q <= numq; q++ {
339 wgr.Add(1)
340 go runReceiver(q)
342 ll.Printf("%stag:%s connsess:%s main_started_receivers\n",
343 exampid, tag, sngecomm.Lcs)
345 ll.Printf("%stag:%s connsess:%s main_starting_senders\n",
346 exampid, tag, sngecomm.Lcs)
347 for q := 1; q <= numq; q++ {
348 wgs.Add(1)
349 go runSender(q)
351 ll.Printf("%stag:%s connsess:%s main_started_senders\n",
352 exampid, tag, sngecomm.Lcs)
354 wgs.Wait()
355 ll.Printf("%stag:%s connsess:%s main_senders_complete\n",
356 exampid, tag, sngecomm.Lcs)
357 wgr.Wait()
358 ll.Printf("%stag:%s connsess:%s main_receivers_complete\n",
359 exampid, tag, sngecomm.Lcs)
362 // The end
363 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
364 exampid, tag, sngecomm.Lcs,
365 time.Now().Sub(st))