Changes to publish.go:
[stompngo_examples.git] / srmgor_1conn / srmgor_1conn.go
blob445da4b8f5aa51f0a757aad7dcc4c3616257cd4c
1 //
2 // Copyright © 2011-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Send and receive many STOMP messages using multiple queues and goroutines
24 to service each send or receive instance. All senders and receivers share the
25 same STOMP connection.
27 package main
29 import (
30 "fmt"
31 "log"
32 "net"
33 "os"
34 "runtime"
35 "sync"
36 "time"
38 "github.com/gmallard/stompngo"
39 // senv methods could be used in general by stompngo clients.
40 "github.com/gmallard/stompngo/senv"
41 // sngecomm methods are used specifically for these example clients.
42 "github.com/gmallard/stompngo_examples/sngecomm"
45 var (
46 ll = log.New(os.Stdout, "ECNDS ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
48 exampid = "srmgor_1conn: "
50 wgs sync.WaitGroup
51 wgr sync.WaitGroup
52 wga sync.WaitGroup
54 // We 'stagger' between each message send and message receive for a random
55 // amount of time.
56 // Vary these for experimental purposes. YMMV.
57 max int64 = 1e9 // Max stagger time (nanoseconds)
58 min int64 = max / 10 // Min stagger time (nanoseconds)
60 // Wait flags
61 sw = true
62 rw = true
64 // Sleep multipliers
65 sf float64 = 1.0
66 rf float64 = 1.0
69 n net.Conn // Network Connection
70 conn *stompngo.Connection // Stomp Connection
72 lhl = 44
74 tag = "1conn"
77 // Send messages to a particular queue
78 func sender(qn, mc int) {
79 ltag := tag + "-sender"
81 qns := fmt.Sprintf("%d", qn) // string queue number
82 id := stompngo.Uuid() // A unique sender id
83 d := sngecomm.Dest() + "." + string(exampid[:len(exampid)-2]) + "." + qns
85 ll.Printf("%stag:%s connsess:%s queue_info id:%v d:%v qnum:%v mc:%v\n",
86 exampid, ltag, conn.Session(),
87 id, d, qn, mc)
89 wh := stompngo.Headers{"destination", d, "senderId", id,
90 "qnum", qns} // send Headers
91 if senv.Persistent() {
92 wh = wh.Add("persistent", "true")
95 tmr := time.NewTimer(100 * time.Hour)
96 // Send loop
97 for i := 1; i <= mc; i++ {
98 si := fmt.Sprintf("%d", i)
99 sh := append(wh, "msgnum", si)
100 // Generate a message to send ...............
101 ll.Printf("%stag:%s connsess:%s send_headers id:%v d:%v qnum:%v headers:%v\n",
102 exampid, ltag, conn.Session(),
103 id, d, qn, sh)
104 e := conn.Send(sh, string(sngecomm.Partial()))
105 if e != nil {
106 ll.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
107 exampid, tag, conn.Session(),
108 qn, e.Error()) // Handle this ......
110 if i == mc {
111 break
113 if sw {
114 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
115 ll.Printf("%stag:%s connsess:%s send_stagger id:%v d:%v qnum:%v stagger:%v\n",
116 exampid, ltag, conn.Session(),
117 id, d, qn, dt)
118 tmr.Reset(dt)
119 _ = <-tmr.C
120 runtime.Gosched()
123 // Sending is done
124 ll.Printf("%stag:%s connsess:%s finish_info id:%v d:%v qnum:%v mc:%v\n",
125 exampid, ltag, conn.Session(),
126 id, d, qn, mc)
127 wgs.Done()
130 // Receive messages from a particular queue
131 func receiver(qn, mc int) {
132 ltag := tag + "-receiver"
134 qns := fmt.Sprintf("%d", qn) // string queue number
135 pbc := sngecomm.Pbc()
136 id := stompngo.Uuid() // A unique subscription ID
137 d := sngecomm.Dest() + "." + string(exampid[:len(exampid)-2]) + "." + qns
139 ll.Printf("%stag:%s connsess:%s queue_info id:%v d:%v qnum:%v mc:%v\n",
140 exampid, ltag, conn.Session(),
141 id, d, qn, mc)
142 // Subscribe
143 sc := sngecomm.HandleSubscribe(conn, d, id, sngecomm.AckMode())
144 ll.Printf("%stag:%s connsess:%s subscribe_complete id:%v d:%v qnum:%v mc:%v\n",
145 exampid, ltag, conn.Session(),
146 id, d, qn, mc)
148 tmr := time.NewTimer(100 * time.Hour)
149 var md stompngo.MessageData
150 // Receive loop
151 for i := 1; i <= mc; i++ {
152 ll.Printf("%stag:%s connsess:%s recv_ranchek id:%v d:%v qnum:%v mc:%v chlen:%v chcap:%v\n",
153 exampid, ltag, conn.Session(),
154 id, d, qn, mc, len(sc), cap(sc))
156 select {
157 case md = <-sc:
158 case md = <-conn.MessageData:
159 // A RECEIPT or ERROR frame is unexpected here
160 ll.Fatalf("%stag:%s connsess:%s bad_frame qnum:%v headers:%v body:%s",
161 exampid, tag, conn.Session(),
162 qn, md.Message.Headers, md.Message.Body) // Handle this ......
164 if md.Error != nil {
165 ll.Fatalf("%stag:%s connsess:%s recv_error qnum:%v error:%v",
166 exampid, tag, conn.Session(),
167 qn, md.Error) // Handle this ......
170 // Process the inbound message .................
171 ll.Printf("%stag:%s connsess:%s recv_message qnum:%v i:%v\n",
172 exampid, tag, conn.Session(),
173 qn, i)
174 if pbc > 0 {
175 maxlen := pbc
176 if len(md.Message.Body) < maxlen {
177 maxlen = len(md.Message.Body)
179 ss := string(md.Message.Body[0:maxlen])
180 ll.Printf("%stag:%s connsess:%s payload qnum:%v body:%s\n",
181 exampid, tag, conn.Session(),
182 qn, ss)
186 // Sanity check the message Command, and the queue and message numbers
187 mns := fmt.Sprintf("%d", i) // message number
188 if md.Message.Command != stompngo.MESSAGE {
189 ll.Fatalf("%stag:%s connsess:%s bad_frame qnum:%v command:%v headers:%v body:%v\n",
190 exampid, tag, conn.Session(),
191 qn, md.Message.Command, md.Message.Headers, string(md.Message.Body)) // Handle this ......
194 if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mns) {
195 ll.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v command:%v headers:%v body:%v\n",
196 exampid, tag, conn.Session(),
197 qns, mns, md.Message.Command, md.Message.Headers, string(md.Message.Body)) // Handle this ......) // Handle this ......
200 if i == mc {
201 break
204 if rw {
205 dt := time.Duration(sngecomm.ValueBetween(min, max, rf))
206 ll.Printf("%stag:%s connsess:%s recv_stagger id:%v d:%v qnum:%v stagger:%v\n",
207 exampid, ltag, conn.Session(),
208 id, d, qn, dt)
209 tmr.Reset(dt)
210 _ = <-tmr.C
211 runtime.Gosched()
214 // Handle ACKs if needed
215 if sngecomm.AckMode() != "auto" {
216 sngecomm.HandleAck(conn, md.Message.Headers, id)
219 // Unsubscribe
220 sngecomm.HandleUnsubscribe(conn, d, id)
221 ll.Printf("%stag:%s connsess:%s unsubscribe_complete id:%v d:%v qnum:%v mc:%v\n",
222 exampid, ltag, conn.Session(),
223 id, d, qn, mc)
225 // Receiving is done
226 ll.Printf("%stag:%s connsess:%s recv_end id:%v d:%v qnum:%v mc:%v\n",
227 exampid, ltag, conn.Session(),
228 id, d, qn, mc)
230 wgr.Done()
234 Start all sender go routines.
236 func startSenders(nqs int) {
237 ltag := tag + "-startsenders"
239 ll.Printf("%stag:%s connsess:%s queue_count nqs:%v\n",
240 exampid, ltag, conn.Session(),
241 nqs)
243 mc := senv.Nmsgs() // message count
244 ll.Printf("%stag:%s connsess:%s message_count mc:%v\n",
245 exampid, ltag, conn.Session(),
247 for i := 1; i <= nqs; i++ { // all queues
248 wgs.Add(1)
249 go sender(i, mc)
251 wgs.Wait()
253 ll.Printf("%stag:%s connsess:%s ends nqs:%v mc:%v\n",
254 exampid, ltag, conn.Session(),
255 nqs, mc)
256 wga.Done()
260 Start all receiver go routines.
262 func startReceivers(nqs int) {
263 ltag := tag + "-startreceivers"
265 ll.Printf("%stag:%s connsess:%s queue_count nqs:%v\n",
266 exampid, ltag, conn.Session(),
267 nqs)
269 mc := senv.Nmsgs() // get message count
270 ll.Printf("%stag:%s connsess:%s message_count mc:%v\n",
271 exampid, ltag, conn.Session(),
274 for i := 1; i <= nqs; i++ { // all queues
275 wgr.Add(1)
276 go receiver(i, mc)
278 wgr.Wait()
280 ll.Printf("%stag:%s connsess:%s ends nqs:%v mc:%v\n",
281 exampid, ltag, conn.Session(),
282 nqs, mc)
283 wga.Done()
286 // Show a number of writers and readers operating concurrently from unique
287 // destinations.
288 func main() {
290 st := time.Now()
292 sngecomm.ShowRunParms(exampid)
294 ll.Printf("%stag:%s connsess:%s main_starts\n",
295 exampid, tag, sngecomm.Lcs)
297 ll.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
298 exampid, tag, sngecomm.Lcs,
299 sngecomm.Pprof())
301 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
302 exampid, tag, sngecomm.Lcs,
303 runtime.GOMAXPROCS(-1))
305 if sngecomm.SetMAXPROCS() {
306 nc := runtime.NumCPU()
307 ll.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
308 exampid, tag, sngecomm.Lcs,
310 gmp := runtime.GOMAXPROCS(nc)
311 ll.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
312 exampid, tag, sngecomm.Lcs,
313 gmp)
314 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
315 exampid, tag, sngecomm.Lcs,
316 runtime.GOMAXPROCS(-1))
318 // Wait flags
319 sw = sngecomm.SendWait()
320 rw = sngecomm.RecvWait()
321 sf = sngecomm.SendFactor()
322 rf = sngecomm.RecvFactor()
323 ll.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
324 exampid, tag, sngecomm.Lcs,
325 sw, rw, sf, rf)
326 // Number of queues
327 nqs := sngecomm.Nqs()
329 // Standard example connect sequence
330 var e error
331 n, conn, e = sngecomm.CommonConnect(exampid, tag, ll)
332 if e != nil {
333 if conn != nil {
334 ll.Printf("%stag:%s connsess:%s Connect Response headers:%v body%s\n",
335 exampid, tag, conn.Session(), conn.ConnectResponse.Headers,
336 string(conn.ConnectResponse.Body))
338 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
339 exampid, tag, sngecomm.Lcs,
340 e.Error()) // Handle this ......
343 // Many receivers running under the same connection can cause
344 // (wire read) performance issues. This is *very* dependent on the broker
345 // being used, specifically the broker's algorithm for putting messages on
346 // the wire.
347 // To alleviate those issues, this strategy insures that messages are
348 // received from the wire as soon as possible. Those messages are then
349 // buffered internally for (possibly later) application processing. In
350 // this example, buffering occurs in the stompngo package.
351 conn.SetSubChanCap(senv.SubChanCap()) // Experiment with this value, YMMV
353 // Run everything
354 wga.Add(2)
355 go startReceivers(nqs)
356 go startSenders(nqs)
357 wga.Wait()
359 // Standard example disconnect sequence
360 e = sngecomm.CommonDisconnect(n, conn, exampid, tag, ll)
361 if e != nil {
362 ll.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
363 exampid, tag, conn.Session(),
364 e.Error()) // Handle this ......
367 sngecomm.ShowStats(exampid, tag, conn)
369 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
370 exampid, tag, conn.Session(),
371 time.Now().Sub(st))
373 time.Sleep(250 * time.Millisecond)