A scripted update to formats of log messages:
[stompngo_examples.git] / srmgor_2conn / srmgor_2conn.go
blob17ebd7b39670613cbd3c8d8135e08b9ace65a53d
1 //
2 // Copyright © 2011-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. All senders share a single
24 STOMP connection, as do all receivers.
26 package main
28 import (
29 "flag"
30 "fmt"
31 "log"
32 "net"
33 "os"
34 "runtime"
35 "strconv"
36 "sync"
37 "time"
39 "github.com/davecheney/profile"
41 "github.com/gmallard/stompngo"
42 // senv methods could be used in general by stompngo clients.
43 "github.com/gmallard/stompngo/senv"
44 // sngecomm methods are used specifically for these example clients.
45 "github.com/gmallard/stompngo_examples/sngecomm"
48 var (
49 exampid = "srmgor_2conn:"
51 wgs sync.WaitGroup
52 wgr sync.WaitGroup
53 wga sync.WaitGroup
55 // We 'stagger' between each message send and message receive for a random
56 // amount of time.
57 // Vary these for experimental purposes. YMMV.
58 max int64 = 1e9 // Max stagger time (nanoseconds)
59 min int64 = max / 10 // Min stagger time (nanoseconds)
61 // Wait flags
62 sw = true
63 rw = true
65 // Sleep multipliers
66 sf float64 = 1.0
67 rf float64 = 1.0
69 // Possible profile file
70 cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
72 ll = log.New(os.Stdout, "E1S1R ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
75 // Send messages to a particular queue
76 func sender(conn *stompngo.Connection, qn, nmsgs int) {
77 qns := fmt.Sprintf("%d", qn) // queue number
78 ll.Printf("%s connsess:%s sender_starts qn:%d nmsgs:%d\n",
79 exampid, conn.Session(), qn, nmsgs)
81 // qp := senv.Dest() // queue name prefix
82 d := senv.Dest() + "." + qns
83 ll.Printf("%s connsess:%s sender_starts d:%s\n",
84 exampid, conn.Session(), d)
85 wh := stompngo.Headers{"destination", d,
86 "qnum", qns} // send Headers
87 if senv.Persistent() {
88 wh = wh.Add("persistent", "true")
91 tmr := time.NewTimer(100 * time.Hour)
92 // Send loop
93 for i := 1; i <= nmsgs; i++ {
94 si := fmt.Sprintf("%d", i)
95 sh := append(wh, "msgnum", si)
96 // Generate a message to send ...............
97 ll.Printf("%s connsess:%s sender_message qns:%s si:%s\n",
98 exampid, conn.Session(), qns, si)
99 e := conn.Send(sh, string(sngecomm.Partial()))
101 if e != nil {
102 ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "send error", e, qn)
103 break
105 if sw {
106 runtime.Gosched() // yield for this example
107 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
108 ll.Printf("%s connsess:%s send_stagger dt:%v qns:%s\n",
109 exampid, conn.Session(),
110 dt, qns)
111 tmr.Reset(dt)
112 _ = <-tmr.C
115 // Sending is done
116 ll.Printf("%s connsess:%s sender_ends qn:%d nmsgs:%d\n",
117 exampid, conn.Session(), qn, nmsgs)
118 wgs.Done()
121 // Asynchronously process all messages for a given subscription.
122 func receiveWorker(sc <-chan stompngo.MessageData, qns string, nmsgs int,
123 qc chan<- bool, conn *stompngo.Connection, id string) {
125 tmr := time.NewTimer(100 * time.Hour)
127 pbc := sngecomm.Pbc() // Print byte count
129 // Receive loop
130 var md stompngo.MessageData
131 for i := 1; i <= nmsgs; i++ {
133 select {
134 case md = <-sc:
135 case md = <-conn.MessageData:
136 // Frames RECEIPT or ERROR not expected here
137 ll.Fatalf("%s v1:%v\n", exampid, md) // Handle this
139 if md.Error != nil {
140 ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "recv read error", md.Error, qns)
143 // Sanity check the queue and message numbers
144 mns := fmt.Sprintf("%d", i) // message number
145 if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mns) {
146 ll.Fatalf("%s v1:%v v2:%v v3:%v v4:%v\n", exampid, "Bad Headers", md.Message.Headers, qns, mns)
149 // Process the inbound message .................
150 sl := len(md.Message.Body)
151 if pbc > 0 {
152 sl = pbc
153 if len(md.Message.Body) < sl {
154 sl = len(md.Message.Body)
158 // Handle ACKs if needed
159 if sngecomm.AckMode() != "auto" {
160 ah := []string{}
161 sngecomm.HandleAck(conn, ah, id)
163 ll.Printf("%s connsess:%s recv_message body:%s qns:%s msgnum:%s\n",
164 exampid, conn.Session(),
165 string(md.Message.Body[0:sl]),
166 qns,
167 md.Message.Headers.Value("msgnum"))
168 if i == nmsgs {
169 break
171 if rw {
172 runtime.Gosched() // yield for this example
173 dt := time.Duration(sngecomm.ValueBetween(min, max, rf))
174 ll.Printf("%s connsess:%s recv_stagger dt:%v qns:%s\n",
175 exampid, conn.Session(),
176 dt, qns)
177 tmr.Reset(dt)
178 _ = <-tmr.C
182 qc <- true
185 // Receive messages from a particular queue
186 func receiver(conn *stompngo.Connection, qn, nmsgs int) {
187 qns := fmt.Sprintf("%d", qn) // queue number
188 ll.Printf("%s connsess:%s recveiver_starts qns:%d nmsgs:%d\n",
189 exampid, conn.Session(), qn, nmsgs)
191 qp := senv.Dest() // queue name prefix
192 q := qp + "." + qns
193 ll.Printf("%s connsess:%s recveiver_names q:%s qn:%d\n",
194 exampid, conn.Session(), q, qn)
195 id := stompngo.Uuid() // A unique subscription ID
196 sc := sngecomm.HandleSubscribe(conn, q, id, sngecomm.AckMode())
197 // Many receivers running under the same connection can cause
198 // (wire read) performance issues. This is *very* dependent on the broker
199 // being used, specifically the broker's algorithm for putting messages on
200 // the wire.
201 // To alleviate those issues, this strategy insures that messages are
202 // received from the wire as soon as possible. Those messages are then
203 // buffered internally for (possibly later) application processing.
205 bs := -1 //
206 if s := os.Getenv("STOMP_CONN2BUFFER"); s != "" {
207 i, e := strconv.ParseInt(s, 10, 32)
208 if nil != e {
209 ll.Fatalf("%s v1:%v v2:%v\n", exampid, "CONN2BUFFER conversion error", e)
210 } else {
211 bs = int(i)
214 if bs < 1 {
215 bs = nmsgs
217 ll.Printf("%s connsess:%s recveiver_mdbuffersize bs:%d qn:%d\n",
218 exampid, conn.Session(), bs, qn)
220 // Process all inputs async .......
221 // var mc chan stompngo.MessageData
222 mdc := make(chan stompngo.MessageData, bs) // MessageData Buffer size
223 dc := make(chan bool) // Receive processing done channel
224 go receiveWorker(mdc, qns, nmsgs, dc, conn, id) // Start async processor
225 for i := 1; i <= nmsgs; i++ {
226 mdc <- <-sc // Receive message data as soon as possible, and internally queue it
228 ll.Printf("%s connsess:%s recveiver_waitforWorkersBegin qns:%s\n",
229 exampid, conn.Session(), qns)
230 <-dc // Wait until receive processing is done for this queue
231 ll.Printf("%s connsess:%s recveiver_waitforWorkersEnd qns:%s\n",
232 exampid, conn.Session(), qns)
234 // Unsubscribe
235 sngecomm.HandleUnsubscribe(conn, q, id)
237 // Receiving is done
238 ll.Printf("%s connsess:%s recveiver_ends qns:%s\n",
239 exampid, conn.Session(), qns)
240 wgr.Done()
243 func startSenders(qn int) {
244 ll.Printf("%s startSenders_starts qn:%d\n",
245 exampid, qn)
247 // Open
248 h, p := senv.HostAndPort() // host and port
249 hap := net.JoinHostPort(h, p)
250 n, e := net.Dial("tcp", hap)
251 if e != nil {
252 ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "startSenders netconnect error", e, qn) // Handle this ......
255 // Stomp connect
256 ch := sngecomm.ConnectHeaders()
257 ll.Printf("%s startSenders_sdata vhost:%s protocol:%s qn:%d\n",
258 exampid, senv.Vhost(), senv.Protocol(), qn)
259 conn, e := stompngo.Connect(n, ch)
260 if e != nil {
261 ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "startSenders stompconnect error", e, qn) // Handle this ......
263 ll.Printf("%s connsess:%s startSenders_connection qn:%d\n",
264 exampid, conn.Session(), qn)
265 nmsgs := senv.Nmsgs() // message count
266 ll.Printf("%s connsess:%s startSenders_message_count nmsgs:%d qn:%d\n",
267 exampid, conn.Session(), nmsgs, qn)
268 for i := 1; i <= qn; i++ { // all queues
269 wgs.Add(1)
270 go sender(conn, i, nmsgs)
272 wgs.Wait()
274 // Disconnect from Stomp server
275 e = conn.Disconnect(stompngo.Headers{})
276 if e != nil {
277 ll.Printf("%s v1:%v v2:%v v3:%v\n", exampid, "startSenders disconnect error", e, qn) // Handle this ......
279 // Network close
280 e = n.Close()
281 if e != nil {
282 ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "startSenders netclose error", e, qn) // Handle this ......
285 ll.Printf("%s startSenders_ends qn:%d\n",
286 exampid, qn)
287 sngecomm.ShowStats(exampid, "startSenders", conn)
288 wga.Done()
291 func startReceivers(qn int) {
292 ll.Printf("%s startReceivers_starts qn:%d\n",
293 exampid, qn)
295 // Open
296 h, p := senv.HostAndPort() // host and port
297 n, e := net.Dial("tcp", net.JoinHostPort(h, p))
298 if e != nil {
299 ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "startReceivers nectonnr:", e, qn) // Handle this ......
301 ch := sngecomm.ConnectHeaders()
302 ll.Printf("%s startReceivers_sdata vhost:%s protocol:%s qn:%dn",
303 exampid, senv.Vhost(), senv.Protocol(), qn)
304 conn, e := stompngo.Connect(n, ch)
305 if e != nil {
306 ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "startReceivers stompconnectr:", e, qn) // Handle this ......
308 ll.Printf("%s connsess:%s startReceivers_conndata qn:%d\n",
309 exampid, conn.Session(), qn)
310 nmsgs := senv.Nmsgs() // get message count
311 ll.Printf("%s connsess:%s startReceivers_message_count nmsgs:%d qn:%d\n",
312 exampid, conn.Session(), nmsgs, qn)
313 for i := 1; i <= qn; i++ { // all queues
314 wgr.Add(1)
315 go receiver(conn, i, nmsgs)
317 wgr.Wait()
319 // Disconnect from Stomp server
320 e = conn.Disconnect(stompngo.Headers{})
321 if e != nil {
322 ll.Printf("%s v1:%v v2:%v v3:%v\n", exampid, "startReceivers disconnect error", e, qn) // Handle this ......
324 // Network close
325 e = n.Close()
326 if e != nil {
327 ll.Printf("%s v1:%v v2:%v v3:%v\n", exampid, "startReceivers netclose error", e, qn) // Handle this ......
330 ll.Printf("%s startReceivers_ends qn:%d\n",
331 exampid, qn)
332 sngecomm.ShowStats(exampid, "startReceivers", conn)
333 wga.Done()
336 // Show a number of writers and readers operating concurrently from unique
337 // destinations.
338 func main() {
339 sngecomm.ShowRunParms(exampid)
341 if sngecomm.Pprof() {
342 cfg := profile.Config{
343 MemProfile: true,
344 CPUProfile: true,
345 BlockProfile: true,
346 NoShutdownHook: false, // Hook SIGINT
348 defer profile.Start(&cfg).Stop()
351 tn := time.Now()
352 ll.Printf("%s v1:%v\n", exampid, "main starts")
354 if sngecomm.SetMAXPROCS() {
355 nc := runtime.NumCPU()
356 ll.Printf("%s v1:%v v2:%v\n", exampid, "main number of CPUs is:", nc)
357 c := runtime.GOMAXPROCS(nc)
358 ll.Printf("%s v1:%v v2:%v\n", exampid, "main previous number of GOMAXPROCS is:", c)
359 ll.Printf("%s v1:%v v2:%v\n", exampid, "main current number of GOMAXPROCS is:", runtime.GOMAXPROCS(-1))
362 sw = sngecomm.SendWait()
363 rw = sngecomm.RecvWait()
364 sf = sngecomm.SendFactor()
365 rf = sngecomm.RecvFactor()
366 ll.Printf("%s v1:%v v2:%v v3:%v v4:%v v5:%v\n", exampid, "main Sleep Factors", "send", sf, "recv", rf)
368 q := sngecomm.Nqs()
370 wga.Add(2)
371 go startReceivers(q)
372 go startSenders(q)
373 wga.Wait()
375 ll.Printf("%s v1:%v v2:%v\n", exampid, "main ends", time.Since(tn))