Changes to publish.go:
[stompngo_examples.git] / srmgor_2conn / srmgor_2conn.go
blob185b3479bbf26551e29e80cc33a61e4c403f9649
1 //
2 // Copyright © 2011-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. All senders share a single
24 STOMP connection, as do all receivers.
26 package main
28 import (
29 "flag"
30 "fmt"
31 "log"
32 "os"
33 "runtime"
34 "strconv"
35 "sync"
36 "time"
38 "github.com/gmallard/stompngo"
39 // senv methods could be used in general by stompngo clients.
40 "github.com/gmallard/stompngo/senv"
41 // sngecomm methods are used specifically for these example clients.
42 "github.com/gmallard/stompngo_examples/sngecomm"
45 var (
46 exampid = "srmgor_2conn: "
48 wgs sync.WaitGroup
49 wgr sync.WaitGroup
50 wga sync.WaitGroup
52 // We 'stagger' between each message send and message receive for a random
53 // amount of time.
54 // Vary these for experimental purposes. YMMV.
55 max int64 = 1e9 // Max stagger time (nanoseconds)
56 min int64 = max / 10 // Min stagger time (nanoseconds)
58 // Wait flags
59 sw = true
60 rw = true
62 // Sleep multipliers
63 sf float64 = 1.0
64 rf float64 = 1.0
66 // Possible profile file
67 cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
69 ll = log.New(os.Stdout, "E1S1R ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
71 tag = "2conn"
74 // Send messages to a particular queue
75 func sender(conn *stompngo.Connection, qn, nmsgs int) {
76 ltag := tag + "-sender"
78 qns := fmt.Sprintf("%d", qn) // queue number
79 d := sngecomm.Dest() + "." + string(exampid[:len(exampid)-2]) + "." + qns
80 ll.Printf("%stag:%s connsess:%s starts qn:%d nmsgs:%d d:%s\n",
81 exampid, ltag, conn.Session(),
82 qn, nmsgs, d)
84 wh := stompngo.Headers{"destination", d,
85 "qnum", qns} // send Headers
86 if senv.Persistent() {
87 wh = wh.Add("persistent", "true")
90 tmr := time.NewTimer(100 * time.Hour)
91 // Send loop
92 for i := 1; i <= nmsgs; i++ {
93 si := fmt.Sprintf("%d", i)
94 sh := append(wh, "msgnum", si)
95 // Generate a message to send ...............
96 ll.Printf("%stag:%s connsess:%s message qns:%s si:%s\n",
97 exampid, ltag, conn.Session(),
98 qns, si)
99 e := conn.Send(sh, string(sngecomm.Partial()))
100 if e != nil {
101 ll.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
102 exampid, ltag, conn.Session(),
103 qn, e.Error()) // Handle this ......
105 if i == nmsgs {
106 break
108 if sw {
109 runtime.Gosched() // yield for this example
110 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
111 ll.Printf("%stag:%s connsess:%s send_stagger dt:%v qns:%s\n",
112 exampid, ltag, conn.Session(),
113 dt, qns)
114 tmr.Reset(dt)
115 _ = <-tmr.C
118 // Sending is done
119 ll.Printf("%stag:%s connsess:%s sender_ends qn:%d nmsgs:%d\n",
120 exampid, ltag, conn.Session(),
121 qn, nmsgs)
122 wgs.Done()
125 // Asynchronously process all messages for a given subscription.
126 func receiveWorker(sc <-chan stompngo.MessageData, qns string, nmsgs int,
127 qc chan<- bool, conn *stompngo.Connection, id string) {
129 ltag := tag + "-receiveWorker"
131 tmr := time.NewTimer(100 * time.Hour)
133 pbc := sngecomm.Pbc() // Print byte count
135 // Receive loop
136 var md stompngo.MessageData
137 for i := 1; i <= nmsgs; i++ {
139 select {
140 case md = <-sc:
141 case md = <-conn.MessageData:
142 // Frames RECEIPT or ERROR not expected here
143 ll.Fatalf("%stag:%s connsess:%s bad_frame qns:%v md:%v",
144 exampid, ltag, conn.Session(),
145 qns, md) // Handle this ......
147 if md.Error != nil {
148 ll.Fatalf("%stag:%s connsess:%s recv_error qns:%v error:%v",
149 exampid, ltag, conn.Session(),
150 qns, md.Error) // Handle this ......
153 // Sanity check the queue and message numbers
154 mns := fmt.Sprintf("%d", i) // message number
155 if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mns) {
156 ll.Fatalf("%stag:%s connsess:%s dirty_message qnum:%v msgnum:%v md:%v",
157 exampid, ltag, conn.Session(),
158 qns, mns, md) // Handle this ......
161 // Process the inbound message .................
162 sl := len(md.Message.Body)
163 if pbc > 0 {
164 sl = pbc
165 if len(md.Message.Body) < sl {
166 sl = len(md.Message.Body)
170 // Handle ACKs if needed
171 if sngecomm.AckMode() != "auto" {
172 ah := []string{}
173 sngecomm.HandleAck(conn, ah, id)
175 ll.Printf("%stag:%s connsess:%s recv_message body:%s qns:%s msgnum:%s i:%v\n",
176 exampid, ltag, conn.Session(),
177 string(md.Message.Body[0:sl]),
178 qns,
179 md.Message.Headers.Value("msgnum"), i)
180 if i == nmsgs {
181 break
183 if rw {
184 runtime.Gosched() // yield for this example
185 dt := time.Duration(sngecomm.ValueBetween(min, max, rf))
186 ll.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s\n",
187 exampid, ltag, conn.Session(),
188 dt, qns)
189 tmr.Reset(dt)
190 _ = <-tmr.C
194 qc <- true
197 // Receive messages from a particular queue
198 func receiver(conn *stompngo.Connection, qn, nmsgs int) {
199 ltag := tag + "-receiver"
201 qns := fmt.Sprintf("%d", qn) // queue number
202 ll.Printf("%stag:%s connsess:%s starts qns:%d nmsgs:%d\n",
203 exampid, ltag, conn.Session(),
204 qn, nmsgs)
206 qp := sngecomm.Dest() // queue name prefix
207 q := qp + "." + string(exampid[:len(exampid)-2]) + "." + qns
208 ll.Printf("%stag:%s connsess:%s queue_info q:%s qn:%d nmsgs:%d\n",
209 exampid, ltag, conn.Session(),
210 q, qn, nmsgs)
211 id := stompngo.Uuid() // A unique subscription ID
212 sc := sngecomm.HandleSubscribe(conn, q, id, sngecomm.AckMode())
213 ll.Printf("%stag:%s connsess:%s subscribe_complete\n",
214 exampid, ltag, conn.Session())
215 // Many receivers running under the same connection can cause
216 // (wire read) performance issues. This is *very* dependent on the broker
217 // being used, specifically the broker's algorithm for putting messages on
218 // the wire.
219 // To alleviate those issues, this strategy insures that messages are
220 // received from the wire as soon as possible. Those messages are then
221 // buffered internally for (possibly later) application processing.
223 bs := -1 //
224 if s := os.Getenv("STOMP_CONN2BUFFER"); s != "" {
225 i, e := strconv.ParseInt(s, 10, 32)
226 if nil != e {
227 ll.Fatalf("%stag:%s connsess:%s CONN2BUFFER_conversion_error error:%v",
228 exampid, ltag, conn.Session(),
229 e.Error()) // Handle this ......
231 } else {
232 bs = int(i)
235 if bs < 1 {
236 bs = nmsgs
238 ll.Printf("%stag:%s connsess:%s mdbuffersize_qnum bs:%d qn:%d\n",
239 exampid, ltag, conn.Session(),
240 bs, qn)
242 // Process all inputs async .......
243 // var mc chan stompngo.MessageData
244 mdc := make(chan stompngo.MessageData, bs) // MessageData Buffer size
245 dc := make(chan bool) // Receive processing done channel
246 go receiveWorker(mdc, qns, nmsgs, dc, conn, id) // Start async processor
247 for i := 1; i <= nmsgs; i++ {
248 mdc <- <-sc // Receive message data as soon as possible, and internally queue it
250 ll.Printf("%stag:%s connsess:%s waitforWorkersBegin qns:%s\n",
251 exampid, ltag, conn.Session(),
252 qns)
253 <-dc // Wait until receive processing is done for this queue
254 ll.Printf("%stag:%s connsess:%s waitforWorkersEnd qns:%s\n",
255 exampid, ltag, conn.Session(),
256 qns)
258 // Unsubscribe
259 sngecomm.HandleUnsubscribe(conn, q, id)
260 ll.Printf("%stag:%s connsess:%s unsubscribe_complete\n",
261 exampid, ltag, conn.Session())
263 // Receiving is done
264 ll.Printf("%stag:%s connsess:%s ends qns:%s\n",
265 exampid, ltag, conn.Session(),
266 qns)
267 wgr.Done()
270 func startSenders(qn int) {
271 ltag := tag + "-startsenders"
273 ll.Printf("%stag:%s connsess:%s queue qn:%v\n",
274 exampid, ltag, sngecomm.Lcs,
277 // Standard example connect sequence
278 n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll)
279 if e != nil {
280 ll.Fatalf("%stag:%s connsess:%s on_connect error:%v",
281 exampid, ltag, sngecomm.Lcs,
282 e.Error()) // Handle this ......
285 nmsgs := senv.Nmsgs() // message count
286 ll.Printf("%stag:%s connsess:%s message_count nmsgs:%d qn:%d\n",
287 exampid, ltag, conn.Session(),
288 nmsgs, qn)
289 for i := 1; i <= qn; i++ { // all queues
290 wgs.Add(1)
291 go sender(conn, i, nmsgs)
293 ll.Printf("%stag:%s connsess:%s starts_done\n",
294 exampid, ltag, conn.Session())
295 wgs.Wait()
297 // Standard example disconnect sequence
298 e = sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll)
299 if e != nil {
300 ll.Fatalf("%stag:%s connsess:%s on_disconnect error:%v",
301 exampid, ltag, conn.Session(),
302 e.Error()) // Handle this ......
305 sngecomm.ShowStats(exampid, ltag, conn)
306 wga.Done()
309 func startReceivers(qn int) {
310 ltag := tag + "-startreceivers"
312 ll.Printf("%stag:%s connsess:%s starts qn:%d\n",
313 exampid, ltag, sngecomm.Lcs,
316 // Standard example connect sequence
317 n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll)
318 if e != nil {
319 ll.Fatalf("%stag:%s connsess:%s on_connect error:%v",
320 exampid, ltag, sngecomm.Lcs,
321 e.Error()) // Handle this ......
324 nmsgs := senv.Nmsgs() // get message count
325 ll.Printf("%stag:%s connsess:%s message_count nmsgs:%d qn:%d\n",
326 exampid, ltag, conn.Session(),
327 nmsgs, qn)
328 for i := 1; i <= qn; i++ { // all queues
329 wgr.Add(1)
330 go receiver(conn, i, nmsgs)
332 ll.Printf("%stag:%s connsess:%s starts_done\n",
333 exampid, ltag, conn.Session())
334 wgr.Wait()
336 // Standard example disconnect sequence
337 e = sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll)
338 if e != nil {
339 ll.Fatalf("%stag:%s connsess:%s on_disconnect error:%v",
340 exampid, ltag, conn.Session(),
341 e.Error()) // Handle this ......
344 sngecomm.ShowStats(exampid, ltag, conn)
345 wga.Done()
348 // Show a number of writers and readers operating concurrently from unique
349 // destinations.
350 func main() {
352 st := time.Now()
354 sngecomm.ShowRunParms(exampid)
356 ll.Printf("%stag:%s connsess:%s main_starts\n",
357 exampid, tag, sngecomm.Lcs)
359 ll.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
360 exampid, tag, sngecomm.Lcs,
361 sngecomm.Pprof())
363 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
364 exampid, tag, sngecomm.Lcs,
365 runtime.GOMAXPROCS(-1))
367 if sngecomm.SetMAXPROCS() {
368 nc := runtime.NumCPU()
369 ll.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
370 exampid, tag, sngecomm.Lcs,
372 gmp := runtime.GOMAXPROCS(nc)
373 ll.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
374 exampid, tag, sngecomm.Lcs,
375 gmp)
376 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
377 exampid, tag, sngecomm.Lcs,
378 runtime.GOMAXPROCS(-1))
381 sw = sngecomm.SendWait()
382 rw = sngecomm.RecvWait()
383 sf = sngecomm.SendFactor()
384 rf = sngecomm.RecvFactor()
385 ll.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
386 exampid, tag, sngecomm.Lcs,
387 sw, rw, sf, rf)
389 q := sngecomm.Nqs()
391 wga.Add(2)
392 go startReceivers(q)
393 go startSenders(q)
394 wga.Wait()
396 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
397 exampid, tag, sngecomm.Lcs,
398 time.Now().Sub(st))
399 time.Sleep(250 * time.Millisecond)