Optionally use connection logger.
[stompngo_examples.git] / srmgor_1smrconn / srmgor_1smrconn.go
blobe97f8f7d7794d0db417ad46234c4ca8329246657
1 //
2 // Copyright © 2014-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receivers checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. All senders use one STOMP connection.
24 All receivers are balanced across multiple STOMP connections. Balancing
25 configuration is taken from environment variables.
27 package main
29 import (
30 "fmt"
31 "log"
32 "net"
33 "os"
34 "runtime"
35 "strconv"
36 "sync"
37 "time"
39 "github.com/gmallard/stompngo"
40 // senv methods could be used in general by stompngo clients.
41 "github.com/gmallard/stompngo/senv"
42 // sngecomm methods are used specifically for these example clients.
43 "github.com/gmallard/stompngo_examples/sngecomm"
46 var (
47 exampid = "srmgor_1smrconn: "
49 // We 'stagger' between each message send and message receive for a random
50 // amount of time.
51 // Vary these for experimental purposes. YMMV.
52 max int64 = 1e9 // Max stagger time (nanoseconds)
53 min int64 = max / 10 // Min stagger time (nanoseconds)
55 // Wait flags
56 sw = true
57 rw = true
59 // Sleep multipliers
60 sf float64 = 1.0
61 rf float64 = 1.0
63 lhl = 44
65 wgs sync.WaitGroup
66 wgr sync.WaitGroup
67 wga sync.WaitGroup
69 ll = log.New(os.Stdout, "E1SMR ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
71 tag = "1smrconn"
75 openSconn opens a stompngo Connection.
77 func openSconn() (net.Conn, *stompngo.Connection) {
78 ltag := tag + "-opensconn"
80 // Standard example connect sequence
81 n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll)
82 if e != nil {
83 ll.Fatalf("%stag:%s consess:%s connect_error error:%s\n",
84 exampid, ltag, sngecomm.Lcs,
85 e.Error()) // Handle this ......
87 return n, conn
91 closeSconn closes a stompngo Connection.
93 func closeSconn(n net.Conn, conn *stompngo.Connection) {
94 ltag := tag + "-closesconn"
96 // Standard example disconnect sequence
97 e := sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll)
98 if e != nil {
99 ll.Fatalf("%stag:%s connsess:%s disconnect_error error:%s\n",
100 exampid, ltag, conn.Session(),
101 e.Error()) // Handle this ......
103 return
107 runReceive receives all messages from a specified queue.
109 func runReceive(conn *stompngo.Connection, q int, w *sync.WaitGroup) {
110 ltag := tag + "-runreceive"
112 qns := fmt.Sprintf("%d", q) // queue number
113 id := stompngo.Uuid() // A unique subscription ID
114 d := sngecomm.Dest() + "." + string(exampid[:len(exampid)-2]) + "." + qns
116 ll.Printf("%stag:%s connsess:%s starts id:%s qns:%s d:%s\n",
117 exampid, ltag, conn.Session(),
118 id, qns, d)
120 // Subscribe (use common helper)
121 sc := sngecomm.HandleSubscribe(conn, d, id, sngecomm.AckMode())
122 ll.Printf("%stag:%s connsess:%s subscribe_done id:%s qns:%s d:%s\n",
123 exampid, ltag, conn.Session(),
124 id, qns, d)
127 tmr := time.NewTimer(100 * time.Hour)
129 pbc := sngecomm.Pbc() // Print byte count
131 nmsgs := senv.Nmsgs()
133 // Receive loop
134 var md stompngo.MessageData
135 for mc := 1; mc <= nmsgs; mc++ {
136 ll.Printf("%stag:%s connsess:%s chanchek id:%s qns:%s lensc:%d capsc:%d\n",
137 exampid, ltag, conn.Session(),
138 id, qns, len(sc), cap(sc))
139 select {
140 case md = <-sc:
141 case md = <-conn.MessageData:
142 // Frames RECEIPT or ERROR not expected here
143 ll.Fatalf("%stag:%s connsess:%s send_error qns:%v md:%v",
144 exampid, ltag, conn.Session(),
145 qns, md) // Handle this ......
148 if md.Error != nil {
149 ll.Fatalf("%stag:%s connsess:%s receive_error qns:%v error:%v\n",
150 exampid, ltag, conn.Session(),
151 qns, md.Error)
154 // Process the inbound message .................
155 ll.Printf("%stag:%s connsess:%s inbound id:%s qns:%s mc:%d\n",
156 exampid, ltag, conn.Session(),
157 id, qns, mc)
158 // Sanity check the message Command, and the queue and message numbers
159 mns := fmt.Sprintf("%d", mc) // string message number
160 if md.Message.Command != stompngo.MESSAGE {
161 ll.Fatalf("%stag:%s connsess:%s bad_frame qns:%s mc:%d md:%v\n",
162 exampid, ltag, conn.Session(),
163 qns, mc, md)
165 if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mns) {
166 ll.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v md:%v",
167 exampid, tag, conn.Session(),
168 qns, mns, md) // Handle this ......
171 sl := len(md.Message.Body)
172 if pbc > 0 {
173 sl = pbc
174 if len(md.Message.Body) < sl {
175 sl = len(md.Message.Body)
179 ll.Printf("%stag:%s connsess:%s runReceive_recv_message id:%s body:%s qns:%s msgnum:%s\n",
180 exampid, ltag, conn.Session(),
181 id, string(md.Message.Body[0:sl]), qns,
182 md.Message.Headers.Value("msgnum"))
184 // Handle ACKs if needed
185 if sngecomm.AckMode() != "auto" {
186 ah := stompngo.Headers{}
187 sngecomm.HandleAck(conn, ah, id)
189 if mc == nmsgs {
190 break
192 if rw {
193 dt := time.Duration(sngecomm.ValueBetween(min, max, rf))
194 ll.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s mc:%d\n",
195 exampid, ltag, conn.Session(),
196 dt, qns, mc)
197 tmr.Reset(dt)
198 _ = <-tmr.C
199 runtime.Gosched()
202 // Unsubscribe
203 sngecomm.HandleUnsubscribe(conn, d, id)
205 ll.Printf("%stag:%s connsess:%s runRecieve_ends id:%s qns:%s\n",
206 exampid, ltag, conn.Session(),
207 id, qns)
208 w.Done()
212 receiverConnection starts individual receivers for this connection.
214 func receiverConnection(conn *stompngo.Connection, cn, qpc int) {
215 ltag := tag + "-receiverconnection"
217 ll.Printf("%stag:%s connsess:%s starts cn:%d qpc:%d\n",
218 exampid, ltag, conn.Session(),
219 cn, qpc)
221 // cn -> a connection number: 1..n
222 // qpc -> destinations per connection
223 // Ex:
224 // 1, 2
225 // 2, 2
226 // 3, 2
228 // This code runs *once* for each connection
230 // These calcs are what causes a skip below. It is a safety valve to keep
231 // from starting one too many connections.
232 cb := cn - 1 // this connection number, zero based
233 q1 := qpc*cb + 1 // 1st queue number
234 ql := q1 + qpc - 1 // last queue number
235 if ql > sngecomm.Nqs() {
236 ql = sngecomm.Nqs() // truncate last if over max destinations
239 var wgrconn sync.WaitGroup
241 var skipped bool
242 if q1 <= ql {
243 ll.Printf("%stag:%s connsess:%s startq cn:%d q1:%d ql: %d\n",
244 exampid, ltag, conn.Session(),
245 cn, q1, ql)
246 skipped = false
247 } else {
248 // Skips are possible, at least with the current calling code, see above
249 ll.Printf("%stag:%s connsess:%s startskip cn:%d q1:%d ql: %d\n",
250 exampid, ltag, conn.Session(),
251 cn, q1, ql)
252 skipped = true
255 for q := q1; q <= ql; q++ {
256 wgrconn.Add(1)
257 go runReceive(conn, q, &wgrconn)
259 wgrconn.Wait()
261 ll.Printf("%stag:%s connsess:%s ends cn:%d qpc:%d skipped:%t\n",
262 exampid, ltag, conn.Session(),
263 cn, qpc, skipped)
264 wgr.Done()
268 startReceivers creates connections per environment variables, and starts each
269 connection.
271 func startReceivers() {
273 ltag := tag + "-startreceivers"
275 // This was a performance experiment. With number of connections.
276 // My recollection is that it did not work out.
277 // However ..... I will leave this code in place for now.
279 // Figure out number of receiver connections wanted
280 nrc := sngecomm.Nqs() // 1 receiver per each destination
281 nqs := nrc // Number of queues (destinations) starts the same
283 if s := os.Getenv("STOMP_RECVCONNS"); s != "" {
284 i, e := strconv.ParseInt(s, 10, 32)
285 if nil != e {
286 ll.Fatalf("%stag:%s connsess:%s RECVCONNS_conversion_error error:%v\n",
287 exampid, ltag, sngecomm.Lcs,
288 e.Error())
289 } else {
290 nrc = int(i)
294 // Limit max receiver connection count to number of destinations
295 if nrc > nqs {
296 nrc = nqs
299 // Next calc. destinations per receiver
300 dpr := nqs / nrc // Calculation first guess.
301 if nqs%nrc != 0 {
302 dpr += 1 // Bump destinations per receiver by 1.
304 // Destinations per receiver must be at least 1
305 if dpr == 0 {
306 dpr = 1
309 ll.Printf("%stag:%s connsess:%s start nrc:%d dpr:%d\n",
310 exampid, ltag, sngecomm.Lcs,
311 nrc, dpr)
313 // So the idea seems to be allow more than one destination per receiver
314 ncm := make([]net.Conn, 0)
315 csm := make([]*stompngo.Connection, 0)
316 for c := 1; c <= nrc; c++ { // :-)
317 n, conn := openSconn()
318 ncm = append(ncm, n)
319 csm = append(csm, conn)
320 wgr.Add(1)
321 ll.Printf("%stag:%s connsess:%s connstart conn_number:%d nrc:%d dpr:%d\n",
322 exampid, ltag, conn.Session(),
323 c, nrc, dpr)
324 go receiverConnection(conn, c, dpr)
326 wgr.Wait()
327 ll.Printf("%stag:%s connsess:%s wait_done nrc:%d dpr:%d\n",
328 exampid, ltag, sngecomm.Lcs,
329 nrc, dpr)
331 for c := 1; c <= nrc; c++ {
332 ll.Printf("%stag:%s connsess:%s connend conn_number:%d nrc:%d dpr:%d\n",
333 exampid, ltag, csm[c-1].Session(),
334 c, nrc, dpr)
335 sngecomm.ShowStats(exampid, ltag, csm[c-1])
336 closeSconn(ncm[c-1], csm[c-1])
339 wga.Done()
343 runSender sends all messages to a specified queue.
345 func runSender(conn *stompngo.Connection, qns string) {
346 ltag := tag + "-runsender"
348 d := sngecomm.Dest() + "." + string(exampid[:len(exampid)-2]) + "." + qns
349 id := stompngo.Uuid() // A unique sender id
350 ll.Printf("%stag:%s connsess:%s start id:%s dest:%s\n",
351 exampid, ltag, conn.Session(),
352 id, d)
353 wh := stompngo.Headers{"destination", d, "senderId", id,
354 "qnum", qns} // basic send Headers
355 if senv.Persistent() {
356 wh = wh.Add("persistent", "true")
358 tmr := time.NewTimer(100 * time.Hour)
359 nmsgs := senv.Nmsgs()
360 for mc := 1; mc <= nmsgs; mc++ {
361 sh := append(wh, "msgnum", fmt.Sprintf("%d", mc))
362 // Generate a message to send ...............
363 ll.Printf("%stag:%s connsess:%s send id:%s qns:%s mc:%d\n",
364 exampid, ltag, conn.Session(),
365 id, qns, mc)
366 e := conn.Send(sh, string(sngecomm.Partial()))
367 if e != nil {
368 ll.Fatalf("%stag:%s connsess:%s send_error qns:%v error:%v",
369 exampid, ltag, conn.Session(),
370 qns, e.Error()) // Handle this ......
372 if mc == nmsgs {
373 break
375 if sw {
376 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
377 ll.Printf("%stag:%s connsess:%s send_stagger dt:%v qns:%s mc:%d\n",
378 exampid, ltag, conn.Session(),
379 dt, qns, mc)
380 tmr.Reset(dt)
381 _ = <-tmr.C
382 runtime.Gosched()
385 ll.Printf("%stag:%s connsess:%s end id:%s dest:%s\n",
386 exampid, ltag, conn.Session(),
387 id, d)
389 wgs.Done()
393 startSender initializes the single send connection, and starts one sender go
394 for each destination.
396 func startSender() {
397 ltag := tag + "-startsender"
399 n, conn := openSconn()
400 ll.Printf("%stag:%s connsess:%s start\n",
401 exampid, ltag, conn.Session())
402 for i := 1; i <= sngecomm.Nqs(); i++ {
403 wgs.Add(1)
404 go runSender(conn, fmt.Sprintf("%d", i))
406 wgs.Wait()
407 ll.Printf("%stag:%s connsess:%s end\n",
408 exampid, ltag, conn.Session())
409 sngecomm.ShowStats(exampid, ltag, conn)
410 closeSconn(n, conn)
412 wga.Done()
416 main is the driver for all logic.
418 func main() {
420 st := time.Now()
422 sngecomm.ShowRunParms(exampid)
424 ll.Printf("%stag:%s connsess:%s main_starts\n",
425 exampid, tag, sngecomm.Lcs)
427 ll.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
428 exampid, tag, sngecomm.Lcs,
429 sngecomm.Pprof())
431 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
432 exampid, tag, sngecomm.Lcs,
433 runtime.GOMAXPROCS(-1))
435 if sngecomm.SetMAXPROCS() {
436 nc := runtime.NumCPU()
437 ll.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
438 exampid, tag, sngecomm.Lcs,
440 gmp := runtime.GOMAXPROCS(nc)
441 ll.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
442 exampid, tag, sngecomm.Lcs,
443 gmp)
444 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
445 exampid, tag, sngecomm.Lcs,
446 runtime.GOMAXPROCS(-1))
448 // Wait flags
449 sw = sngecomm.SendWait()
450 rw = sngecomm.RecvWait()
451 sf = sngecomm.SendFactor()
452 rf = sngecomm.RecvFactor()
453 ll.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
454 exampid, tag, sngecomm.Lcs,
455 sw, rw, sf, rf)
457 wga.Add(1)
458 go startReceivers()
459 wga.Add(1)
460 go startSender()
461 wga.Wait()
463 // The end
464 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
465 exampid, tag, sngecomm.Lcs,
466 time.Now().Sub(st))
467 time.Sleep(250 * time.Millisecond)