Changes to publish.go:
[stompngo_examples.git] / publish / publish.go
blobb9ee27c7ed0f6fb572b2f0e2373ce068bcaced39
1 //
2 // Copyright © 2013-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Publish messages to a STOMP broker.
20 Examples:
22 # Publish to a broker with all defaults:
23 # Host is "localhost"
24 # Port is 61613
25 # Login is "guest"
26 # Passcode is "guest
27 # Virtual Host is "localhost"
28 # Protocol is 1.1
29 go run publish.go
31 # Publish to a broker using STOMP protocol level 1.0:
32 STOMP_PROTOCOL=1.0 go run publish.go
34 # Publish to a broker using a custom host and port:
35 STOMP_HOST=tjjackson STOMP_PORT=62613 go run publish.go
37 # Publish to a broker using a custom port and virtual host:
38 STOMP_PORT=41613 STOMP_VHOST="/" go run publish.go
40 # Publish to a broker using a custom login and passcode:
41 STOMP_LOGIN="userid" STOMP_PASSCODE="t0ps3cr3t" go run publish.go
44 package main
46 import (
47 "fmt"
48 "log"
49 "net"
50 "os"
51 "runtime"
52 "runtime/pprof"
53 "strconv"
54 "sync"
55 "time"
57 "github.com/gmallard/stompngo"
58 // senv methods could be used in general by stompngo clients.
59 "github.com/gmallard/stompngo/senv"
60 // sngecomm methods are used specifically for these example clients.
61 "github.com/gmallard/stompngo_examples/sngecomm"
64 var (
65 exampid = "publish: "
66 ll = log.New(os.Stdout, "EPUB ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
67 tag = "pubmain"
68 wg sync.WaitGroup
69 conn *stompngo.Connection
70 n net.Conn
71 e error
72 nqs int
73 ngor int
74 MNHDR = "sng_msgnum"
75 gorstr = 1 // Starting destination number
77 msfl = true // Fixed message length
78 mslen = 1024 // The fixed length
79 msf []byte
81 gorsl = false
82 gorslfb = false
83 gorslfx = time.Duration(250 * time.Millisecond)
84 gorslms = 250
86 max int64 = 1e9 // Max stagger time (nanoseconds)
87 min int64 = max / 10 // Min stagger time (nanoseconds)
88 // Sleep multipliers
89 sf float64 = 1.0
90 rf float64 = 1.0
93 func init() {
94 ngor = sngecomm.Ngors()
95 nqs = sngecomm.Nqs()
97 // Options around message length:
98 // 1) fixed length
99 // 2) randomly variable length
100 if os.Getenv("STOMP_VARMSL") != "" {
101 msfl = false // Use randomly variable message lengths
103 if msfl {
104 if s := os.Getenv("STOMP_FXMSLEN"); s != "" {
105 i, e := strconv.ParseInt(s, 10, 32)
106 if nil != e {
107 log.Printf("v1:%v v2:%v\n", "FXMSLEN conversion error", e)
108 } else {
109 mslen = int(i) // The fixed length to use
112 msf = sngecomm.PartialSubstr(mslen)
115 // Options controlling sleeps between message sends. Options are:
116 // 1) Don't sleep
117 // 2) Sleep a fixed amount of time
118 // 3) Sleep a random variable amount of time
119 if os.Getenv("STOMP_DOSLEEP") != "" {
120 gorsl = true // Do sleep
122 if os.Getenv("STOMP_FIXSLEEP") != "" {
123 gorslfb = true // Do a fixed length sleep
125 if gorsl {
126 var err error
127 if s := os.Getenv("STOMP_SLEEPMS"); s != "" { // Fixed length milliseconds
128 mss := fmt.Sprintf("%s", s) + "ms"
129 gorslfx, err = time.ParseDuration(mss)
130 if err != nil {
131 log.Printf("v1:%v v2:%v v3:%v\n", "ParseDuration conversion error", mss, e)
136 if s := os.Getenv("STOMP_GORNSTR"); s != "" {
137 i, e := strconv.ParseInt(s, 10, 32)
138 if nil != e {
139 log.Printf("v1:%v v2:%v\n", "GORNSTR conversion error", e)
140 } else {
141 gorstr = int(i) // The fixed length to use
145 func runSends(gr int, qn int) {
146 var err error
147 qns := fmt.Sprintf("%d", qn)
148 qname := sngecomm.Dest() + "." + qns
149 sh := stompngo.Headers{"destination", qname}
150 ll.Printf("%stag:%s connsess:%s destination dest:%s\n",
151 exampid, tag, conn.Session(),
152 qname)
153 if senv.Persistent() {
154 sh = sh.Add("persistent", "true")
156 sh = sh.Add(MNHDR, "0")
157 mnhnum := sh.Index(MNHDR)
158 ll.Printf("%stag:%s connsess:%s send headers:%v\n",
159 exampid, tag, conn.Session(),
161 for i := 1; i <= senv.Nmsgs(); i++ {
162 is := fmt.Sprintf("%d", i) // Next message number
163 sh[mnhnum+1] = is // Put message number in headers
164 // Log send headers
165 ll.Printf("%stag:%s connsess:%s main_sending hdrs:%v\n",
166 exampid, tag, conn.Session(),
169 // Handle fixed or variable message length
170 rml := 0
171 if msfl {
172 err = conn.SendBytes(sh, msf)
173 rml = len(msf)
174 } else {
175 // ostr := string(sngecomm.Partial())
176 // err = conn.Send(sh, ostr)
177 oby := sngecomm.Partial()
178 err = conn.SendBytes(sh, oby)
179 rml = len(oby)
181 if err != nil {
182 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
183 exampid, tag, conn.Session(),
184 err.Error()) // Handle this ......
186 ll.Printf("%stag:%s connsess:%s main_send_complete msfl:~%t~len:%d\n",
187 exampid, tag, conn.Session(),
188 msfl, rml)
190 // Handle sleep options
191 if gorsl {
192 if gorslfb {
193 // Fixed time to sleep
194 ll.Printf("%stag:%s connsess:%s main_fixed sleep:~%v\n",
195 exampid, tag, conn.Session(), gorslfx)
196 time.Sleep(gorslfx)
197 } else {
198 // Variable time to sleep
199 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
200 ll.Printf("%stag:%s connsess:%s main_rand sleep:~%v\n",
201 exampid, tag, conn.Session(), dt)
202 time.Sleep(dt)
206 wg.Done() // signal a goroutine completion
209 // Connect to a STOMP broker, publish some messages and disconnect.
210 func main() {
212 if sngecomm.Pprof() {
213 if sngecomm.Cpuprof() != "" {
214 f, err := os.Create(sngecomm.Cpuprof())
215 if err != nil {
216 log.Fatal("could not create CPU profile: ", err)
218 if err := pprof.StartCPUProfile(f); err != nil {
219 log.Fatal("could not start CPU profile: ", err)
221 defer pprof.StopCPUProfile()
225 st := time.Now()
227 // Standard example connect sequence
228 n, conn, e = sngecomm.CommonConnect(exampid, tag, ll)
229 if e != nil {
230 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
231 exampid, tag, sngecomm.Lcs,
232 e.Error()) // Handle this ......
235 rqn := gorstr - 1
236 for i := gorstr; i <= gorstr+ngor-1; i++ {
237 wg.Add(1)
238 rqn++
239 if nqs > 1 && rqn > nqs {
240 rqn = gorstr
242 go runSends(i, rqn)
244 wg.Wait()
246 // Standard example disconnect sequence
247 e = sngecomm.CommonDisconnect(n, conn, exampid, tag, ll)
248 if e != nil {
249 ll.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
250 exampid, tag, conn.Session(),
251 e.Error()) // Handle this ......
254 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
255 exampid, tag, conn.Session(),
256 time.Now().Sub(st))
258 if sngecomm.Pprof() {
259 if sngecomm.Memprof() != "" {
260 f, err := os.Create(sngecomm.Memprof())
261 if err != nil {
262 log.Fatal("could not create memory profile: ", err)
264 runtime.GC() // get up-to-date statistics
265 if err := pprof.WriteHeapProfile(f); err != nil {
266 log.Fatal("could not write memory profile: ", err)
268 f.Close()