Cleanup from linter suggestions.
[stompngo_examples.git] / publish / publish.go
blobd1bec792e2d01f0c880386d2d4fc768fd1a0e1a0
1 //
2 // Copyright © 2013-2018 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Publish messages to a STOMP broker.
20 Examples:
22 # Publish to a broker with all defaults:
23 # Host is "localhost"
24 # Port is 61613
25 # Login is "guest"
26 # Passcode is "guest
27 # Virtual Host is "localhost"
28 # Protocol is 1.1
29 go run publish.go
31 # Publish to a broker using STOMP protocol level 1.0:
32 STOMP_PROTOCOL=1.0 go run publish.go
34 # Publish to a broker using a custom host and port:
35 STOMP_HOST=tjjackson STOMP_PORT=62613 go run publish.go
37 # Publish to a broker using a custom port and virtual host:
38 STOMP_PORT=41613 STOMP_VHOST="/" go run publish.go
40 # Publish to a broker using a custom login and passcode:
41 STOMP_LOGIN="userid" STOMP_PASSCODE="t0ps3cr3t" go run publish.go
43 # Important environment variables for this program are:
45 # STOMP_NGORS - the number of go routines used to write to the
46 # sepcified queues.
48 # STOMP_NMSGS - the number of messages each go routine will write.
50 # STOMP_NQS - The number of queues to write messages to. If this
51 # variable is absent, the value defaults to the value specified
52 # for STOMP_NGORS. If this value is specified, all go routines
53 # are multi-plexed across this number of queues.
56 package main
58 import (
59 "fmt"
60 "log"
61 "net"
62 "os"
63 "runtime"
64 "runtime/pprof"
65 "strconv"
66 "sync"
67 "time"
69 "github.com/gmallard/stompngo"
70 // senv methods could be used in general by stompngo clients.
71 "github.com/gmallard/stompngo/senv"
72 // sngecomm methods are used specifically for these example clients.
73 "github.com/gmallard/stompngo_examples/sngecomm"
76 var (
77 exampid = "publish: "
78 ll = log.New(os.Stdout, "EPUB ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
79 tag = "pubmain"
80 wg sync.WaitGroup
81 conn *stompngo.Connection
82 n net.Conn
83 e error
84 nqs int
85 ngor int
86 // MNHDR is the message number, in message headers
87 MNHDR = "sng_msgnum"
88 gorstr = 1 // Starting destination number
90 msfl = true // Fixed message length
91 mslen = 1024 // The fixed length
92 msf []byte
94 gorsl = false
95 gorslfb = false
96 gorslfx = time.Duration(250 * time.Millisecond)
97 gorslms = 250
99 max int64 = 1e9 // Max stagger time (nanoseconds)
100 min = max / 10 // Min stagger time (nanoseconds)
101 // Sleep multipliers
102 sf = 1.0
103 rf = 1.0
106 func init() {
107 ngor = sngecomm.Ngors()
108 nqs = sngecomm.Nqs()
110 // Options around message length:
111 // 1) fixed length
112 // 2) randomly variable length
113 if os.Getenv("STOMP_VARMSL") != "" {
114 msfl = false // Use randomly variable message lengths
116 if msfl {
117 if s := os.Getenv("STOMP_FXMSLEN"); s != "" {
118 i, e := strconv.ParseInt(s, 10, 32)
119 if nil != e {
120 log.Printf("v1:%v v2:%v\n", "FXMSLEN conversion error", e)
121 } else {
122 mslen = int(i) // The fixed length to use
125 msf = sngecomm.PartialSubstr(mslen)
128 // Options controlling sleeps between message sends. Options are:
129 // 1) Don't sleep
130 // 2) Sleep a fixed amount of time
131 // 3) Sleep a random variable amount of time
132 if os.Getenv("STOMP_DOSLEEP") != "" {
133 gorsl = true // Do sleep
135 if os.Getenv("STOMP_FIXSLEEP") != "" {
136 gorslfb = true // Do a fixed length sleep
138 if gorsl {
139 var err error
140 if s := os.Getenv("STOMP_SLEEPMS"); s != "" { // Fixed length milliseconds
141 mss := fmt.Sprintf("%s", s) + "ms"
142 gorslfx, err = time.ParseDuration(mss)
143 if err != nil {
144 log.Printf("v1:%v v2:%v v3:%v\n", "ParseDuration conversion error", mss, e)
149 if s := os.Getenv("STOMP_GORNSTR"); s != "" {
150 i, e := strconv.ParseInt(s, 10, 32)
151 if nil != e {
152 log.Printf("v1:%v v2:%v\n", "GORNSTR conversion error", e)
153 } else {
154 gorstr = int(i) // The fixed length to use
158 func runSends(gr int, qn int) {
159 var err error
160 qns := fmt.Sprintf("%d", qn)
161 qname := sngecomm.Dest() + "." + qns
162 sh := stompngo.Headers{"destination", qname}
163 ll.Printf("%stag:%s connsess:%s destination dest:%s BEGIN_runSends %d\n",
164 exampid, tag, conn.Session(),
165 qname, gr)
166 if senv.Persistent() {
167 sh = sh.Add("persistent", "true")
169 sh = sh.Add(MNHDR, "0")
170 mnhnum := sh.Index(MNHDR)
171 ll.Printf("%stag:%s connsess:%s send headers:%v\n",
172 exampid, tag, conn.Session(),
174 for i := 1; i <= senv.Nmsgs(); i++ {
175 is := fmt.Sprintf("%d", i) // Next message number
176 sh[mnhnum+1] = is // Put message number in headers
177 // Log send headers
178 ll.Printf("%stag:%s connsess:%s main_sending gr:%d hdrs:%v\n",
179 exampid, tag, conn.Session(),
180 gr, sh)
182 // Handle fixed or variable message length
183 rml := 0
184 if msfl {
185 err = conn.SendBytes(sh, msf)
186 rml = len(msf)
187 } else {
188 // ostr := string(sngecomm.Partial())
189 // err = conn.Send(sh, ostr)
190 oby := sngecomm.Partial()
191 err = conn.SendBytes(sh, oby)
192 rml = len(oby)
194 if err != nil {
195 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
196 exampid, tag, conn.Session(),
197 err.Error()) // Handle this ......
199 ll.Printf("%stag:%s connsess:%s main_send_complete gr:%d msfl:~%t~len:%d\n",
200 exampid, tag, conn.Session(),
201 gr, msfl, rml)
203 // Handle sleep options
204 if gorsl {
205 if gorslfb {
206 // Fixed time to sleep
207 ll.Printf("%stag:%s connsess:%s gr:%d main_fixed sleep:~%v\n",
208 exampid, tag, conn.Session(), gr, gorslfx)
209 time.Sleep(gorslfx)
210 } else {
211 // Variable time to sleep
212 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
213 ll.Printf("%stag:%s connsess:%s gr:%d main_rand sleep:~%v\n",
214 exampid, tag, conn.Session(), gr, dt)
215 time.Sleep(dt)
219 if sngecomm.UseEOF() {
220 sh := stompngo.Headers{"destination", qname}
221 _ = conn.Send(sh, sngecomm.EOFMsg)
222 ll.Printf("%stag:%s connsess:%s gr:%d sent EOF [%s]\n",
223 exampid, tag, conn.Session(), gr, sngecomm.EOFMsg)
225 wg.Done() // signal a goroutine completion
228 // Connect to a STOMP broker, publish some messages and disconnect.
229 func main() {
231 if sngecomm.Pprof() {
232 if sngecomm.Cpuprof() != "" {
233 ll.Printf("%stag:%s connsess:%s CPUPROF %s\n",
234 exampid, tag, sngecomm.Lcs, sngecomm.Cpuprof())
235 f, err := os.Create(sngecomm.Cpuprof())
236 if err != nil {
237 log.Fatal("could not create CPU profile: ", err)
239 if err := pprof.StartCPUProfile(f); err != nil {
240 log.Fatal("could not start CPU profile: ", err)
242 defer pprof.StopCPUProfile()
246 st := time.Now()
248 // Standard example connect sequence
249 n, conn, e = sngecomm.CommonConnect(exampid, tag, ll)
250 if e != nil {
251 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
252 exampid, tag, sngecomm.Lcs,
253 e.Error()) // Handle this ......
256 ll.Printf("%stag:%s connsess:%s START gorstr:%d ngor:%d nqs:%d nmsgs:%d\n",
257 exampid, tag, conn.Session(), gorstr, ngor, nqs, senv.Nmsgs())
259 rqn := gorstr - 1
260 for i := gorstr; i <= gorstr+ngor-1; i++ {
261 wg.Add(1)
262 rqn++
263 if nqs > 1 && rqn > nqs {
264 rqn = gorstr
266 go runSends(i, rqn)
268 wg.Wait()
270 // Standard example disconnect sequence
271 e = sngecomm.CommonDisconnect(n, conn, exampid, tag, ll)
272 if e != nil {
273 ll.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
274 exampid, tag, conn.Session(),
275 e.Error()) // Handle this ......
278 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
279 exampid, tag, conn.Session(),
280 time.Now().Sub(st))
282 if sngecomm.Pprof() {
283 if sngecomm.Memprof() != "" {
284 ll.Printf("%stag:%s connsess:%s MEMPROF %s\n",
285 exampid, tag, conn.Session(), sngecomm.Memprof())
286 f, err := os.Create(sngecomm.Memprof())
287 if err != nil {
288 log.Fatal("could not create memory profile: ", err)
290 runtime.GC() // get up-to-date statistics
291 if err := pprof.WriteHeapProfile(f); err != nil {
292 log.Fatal("could not write memory profile: ", err)
294 f.Close()