All examples demonstrate proper way to receive messages:
[stompngo_examples.git] / srmgor_1conn / srmgor_1conn.go
blob3d8d01aef68ea1230b7e60661fe293ad146e8547
1 //
2 // Copyright © 2011-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Send and receive many STOMP messages using multiple queues and goroutines
24 to service each send or receive instance. All senders and receivers share the
25 same STOMP connection.
27 package main
29 import (
30 "fmt"
31 "log"
32 "net"
33 "os"
34 "runtime"
35 "sync"
36 "time"
38 "github.com/davecheney/profile"
40 "github.com/gmallard/stompngo"
41 // senv methods could be used in general by stompngo clients.
42 "github.com/gmallard/stompngo/senv"
43 // sngecomm methods are used specifically for these example clients.
44 "github.com/gmallard/stompngo_examples/sngecomm"
47 var (
48 ll = log.New(os.Stdout, "ECNDS ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
50 exampid = "srmgor_1conn:"
52 wgs sync.WaitGroup
53 wgr sync.WaitGroup
54 wga sync.WaitGroup
56 // We 'stagger' between each message send and message receive for a random
57 // amount of time.
58 // Vary these for experimental purposes. YMMV.
59 max int64 = 1e9 // Max stagger time (nanoseconds)
60 min int64 = max / 10 // Min stagger time (nanoseconds)
62 // Wait flags
63 sw = true
64 rw = true
66 // Sleep multipliers
67 sf float64 = 1.0
68 rf float64 = 1.0
71 n net.Conn // Network Connection
72 conn *stompngo.Connection // Stomp Connection
74 lhl = 44
77 // Send messages to a particular queue
78 func sender(qn, mc int) {
79 qns := fmt.Sprintf("%d", qn) // string queue number
80 id := stompngo.Uuid() // A unique sender id
81 ll.Printf("%s id:%s send_start_queue_number qn:%d\n", exampid, id, qn)
83 d := senv.Dest() + "." + qns
84 ll.Printf("%s id:%s send_queue_name:%s qn:%d\n", exampid, id, d, qn)
85 wh := stompngo.Headers{"destination", d, "senderId", id,
86 "qnum", qns} // send Headers
87 if senv.Persistent() {
88 wh = wh.Add("persistent", "true")
91 tmr := time.NewTimer(100 * time.Hour)
92 // Send loop
93 for i := 1; i <= mc; i++ {
94 si := fmt.Sprintf("%d", i)
95 sh := append(wh, "msgnum", si)
96 // Generate a message to send ...............
97 ll.Printf("%s id:%s send_message qn:%d msgnum:%s\n", exampid, id, qn, si)
98 e := conn.Send(sh, string(sngecomm.Partial()))
99 if e != nil {
100 ll.Fatalln(exampid, id, "send error", e, qn)
102 if sw {
103 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
104 ll.Printf("%s send_stagger dt:%v qn:%d id:%s\n",
105 exampid, dt,
106 qn, id)
107 tmr.Reset(dt)
108 _ = <-tmr.C
109 runtime.Gosched()
112 // Sending is done
113 ll.Printf("%s id:%s send_end_queue_number qn:%d\n", exampid, id, qn)
114 wgs.Done()
117 // Receive messages from a particular queue
118 func receiver(qn, mc int) {
119 qns := fmt.Sprintf("%d", qn) // string queue number
120 pbc := sngecomm.Pbc()
121 id := stompngo.Uuid() // A unique subscription ID
123 d := senv.Dest() + "." + qns
124 ll.Printf("%s id:%s recv_queue_name:%s qn:%s\n", exampid, id, d, qns)
125 // Subscribe
126 sc := sngecomm.HandleSubscribe(conn, d, id, sngecomm.AckMode())
128 tmr := time.NewTimer(100 * time.Hour)
129 var md stompngo.MessageData
130 // Receive loop
131 for i := 1; i <= mc; i++ {
132 ll.Printf("%s id:%s recv_ranchek qn:%d chlen:%d chcap:%d\n", exampid, id,
133 qn, len(sc), cap(sc))
135 select {
136 case md = <-sc:
137 case md = <-conn.MessageData:
138 // A RECEIPT or ERROR frame is unexpected here
139 ll.Fatalln(exampid, md) // Handle this
141 if md.Error != nil {
142 ll.Fatalln(exampid, id, "recv error", md.Error, qn)
145 // Process the inbound message .................
146 ll.Printf("%s id:%s recv_message qn:%d msgnum:%d\n", exampid, id, qn, i)
147 if pbc > 0 {
148 maxlen := pbc
149 if len(md.Message.Body) < maxlen {
150 maxlen = len(md.Message.Body)
152 ss := string(md.Message.Body[0:maxlen])
153 ll.Printf("%s Payload: %s\n", exampid, ss) // Data payload
156 // Sanity check the message Command, and the queue and message numbers
157 mns := fmt.Sprintf("%d", i) // message number
158 if md.Message.Command != stompngo.MESSAGE {
159 ll.Fatalln(exampid, "Bad Frame", md, qn, mns)
161 if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mns) {
162 ll.Fatalln(exampid, "Bad Headers", md.Message.Headers, qn, mns)
165 if rw {
166 dt := time.Duration(sngecomm.ValueBetween(min, max, rf))
167 ll.Printf("%s recv_stagger dt:%v qn:%d id:%s\n",
168 exampid, dt,
169 qn, id)
170 tmr.Reset(dt)
171 _ = <-tmr.C
172 runtime.Gosched()
175 // Handle ACKs if needed
176 if sngecomm.AckMode() != "auto" {
177 sngecomm.HandleAck(conn, md.Message.Headers, id)
180 // Unsubscribe
181 sngecomm.HandleUnsubscribe(conn, d, id)
183 // Receiving is done
184 ll.Printf("%s id:%s recv_end_queue_number qn:%d\n", exampid, id, qn)
185 wgr.Done()
189 Start all sender go routines.
191 func startSenders(nqs int) {
192 ll.Printf("%s startSenders_starts nqs:%d\n", exampid, nqs)
194 mc := senv.Nmsgs() // message count
195 // nqs)
196 ll.Printf("%s startSenders_message_count mc:%d nqs:%d\n", exampid, mc, nqs)
197 for i := 1; i <= nqs; i++ { // all queues
198 wgs.Add(1)
199 go sender(i, mc)
201 wgs.Wait()
203 ll.Printf("%s startSenders_endsexampid nqs:%d\n", exampid, nqs)
204 wga.Done()
208 Start all receiver go routines.
210 func startReceivers(nqs int) {
211 ll.Printf("%s startReceivers_starts nqs:%d\n", exampid, "startReceivers starts", nqs)
213 mc := senv.Nmsgs() // get message count
214 ll.Printf("%s startReceivers_message_count mc:%d nqs:%d\n", exampid, mc, nqs)
215 for i := 1; i <= nqs; i++ { // all queues
216 wgr.Add(1)
217 go receiver(i, mc)
219 wgr.Wait()
221 ll.Printf("%s startReceivers_ends nqs:%d\n", exampid, nqs)
222 wga.Done()
225 // Show a number of writers and readers operating concurrently from unique
226 // destinations.
227 func main() {
228 sngecomm.ShowRunParms(exampid)
230 if sngecomm.Pprof() {
231 cfg := profile.Config{
232 MemProfile: true,
233 CPUProfile: true,
234 BlockProfile: true,
235 NoShutdownHook: false, // Hook SIGINT
237 defer profile.Start(&cfg).Stop()
240 start := time.Now()
241 ll.Println(exampid, "main starts")
242 ll.Println(exampid, "main profiling", sngecomm.Pprof())
243 ll.Println(exampid, "main current number of GOMAXPROCS is:", runtime.GOMAXPROCS(-1))
244 if sngecomm.SetMAXPROCS() {
245 nc := runtime.NumCPU()
246 ll.Println(exampid, "main number of CPUs is:", nc)
247 gmp := runtime.GOMAXPROCS(nc)
248 ll.Println(exampid, "main previous number of GOMAXPROCS is:", gmp)
249 ll.Println(exampid, "main current number of GOMAXPROCS is:", runtime.GOMAXPROCS(-1))
251 // Wait flags
252 sw = sngecomm.SendWait()
253 rw = sngecomm.RecvWait()
254 sf = sngecomm.SendFactor()
255 rf = sngecomm.RecvFactor()
256 ll.Println(exampid, "main Sleep Factors", "send", sf, "recv", rf)
257 // Number of queues
258 nqs := sngecomm.Nqs()
259 // Open net and stomp connections
260 h, p := senv.HostAndPort() // network connection host and port
261 var e error
262 // Network open
263 n, e = net.Dial("tcp", net.JoinHostPort(h, p))
264 if e != nil {
265 ll.Fatalln(exampid, "main dial error", e) // Handle this ......
267 // Stomp connect, 1.1(+)
268 ch := sngecomm.ConnectHeaders()
269 ll.Println(exampid, "vhost:", senv.Vhost(), "protocol:", senv.Protocol())
270 conn, e = stompngo.Connect(n, ch)
271 if e != nil {
272 ll.Fatalln(exampid, "main connect error", e) // Handle this ......
275 // Many receivers running under the same connection can cause
276 // (wire read) performance issues. This is *very* dependent on the broker
277 // being used, specifically the broker's algorithm for putting messages on
278 // the wire.
279 // To alleviate those issues, this strategy insures that messages are
280 // received from the wire as soon as possible. Those messages are then
281 // buffered internally for (possibly later) application processing. In
282 // this example, buffering occurs in the stompngo package.
283 conn.SetSubChanCap(senv.SubChanCap()) // Experiment with this value, YMMV
285 // Run everything
286 wga.Add(2)
287 go startReceivers(nqs)
288 go startSenders(nqs)
289 wga.Wait()
291 // Disconnect from Stomp server
292 e = conn.Disconnect(stompngo.Headers{})
293 if e != nil {
294 ll.Fatalln(exampid, "main disconnect error", e) // Handle this ......
296 // Network close
297 e = n.Close()
298 if e != nil {
299 ll.Fatalln(exampid, "main netclose error", e) // Handle this ......
301 sngecomm.ShowStats(exampid, "done", conn)
302 dur := time.Since(start)
303 ll.Println(exampid, "main ends", dur)