2 // Copyright © 2013-2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Publish messages to a STOMP broker.
22 # Publish to a broker with all defaults:
27 # Virtual Host is "localhost"
31 # Publish to a broker using STOMP protocol level 1.0:
32 STOMP_PROTOCOL=1.0 go run publish.go
34 # Publish to a broker using a custom host and port:
35 STOMP_HOST=tjjackson STOMP_PORT=62613 go run publish.go
37 # Publish to a broker using a custom port and virtual host:
38 STOMP_PORT=41613 STOMP_VHOST="/" go run publish.go
40 # Publish to a broker using a custom login and passcode:
41 STOMP_LOGIN="userid" STOMP_PASSCODE="t0ps3cr3t" go run publish.go
57 "github.com/gmallard/stompngo"
58 // senv methods could be used in general by stompngo clients.
59 "github.com/gmallard/stompngo/senv"
60 // sngecomm methods are used specifically for these example clients.
61 "github.com/gmallard/stompngo_examples/sngecomm"
66 ll
= log
.New(os
.Stdout
, "EPUB ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
69 conn
*stompngo
.Connection
75 gorstr
= 1 // Starting destination number
77 msfl
= true // Fixed message length
78 mslen
= 1024 // The fixed length
83 gorslfx
= time
.Duration(250 * time
.Millisecond
)
86 max
int64 = 1e9
// Max stagger time (nanoseconds)
87 min
int64 = max
/ 10 // Min stagger time (nanoseconds)
94 ngor
= sngecomm
.Ngors()
97 // Options around message length:
99 // 2) randomly variable length
100 if os
.Getenv("STOMP_VARMSL") != "" {
101 msfl
= false // Use randomly variable message lengths
104 if s
:= os
.Getenv("STOMP_FXMSLEN"); s
!= "" {
105 i
, e
:= strconv
.ParseInt(s
, 10, 32)
107 log
.Printf("v1:%v v2:%v\n", "FXMSLEN conversion error", e
)
109 mslen
= int(i
) // The fixed length to use
112 msf
= sngecomm
.PartialSubstr(mslen
)
115 // Options controlling sleeps between message sends. Options are:
117 // 2) Sleep a fixed amount of time
118 // 3) Sleep a random variable amount of time
119 if os
.Getenv("STOMP_DOSLEEP") != "" {
120 gorsl
= true // Do sleep
122 if os
.Getenv("STOMP_FIXSLEEP") != "" {
123 gorslfb
= true // Do a fixed length sleep
127 if s
:= os
.Getenv("STOMP_SLEEPMS"); s
!= "" { // Fixed length milliseconds
128 mss
:= fmt
.Sprintf("%s", s
) + "ms"
129 gorslfx
, err
= time
.ParseDuration(mss
)
131 log
.Printf("v1:%v v2:%v v3:%v\n", "ParseDuration conversion error", mss
, e
)
136 if s
:= os
.Getenv("STOMP_GORNSTR"); s
!= "" {
137 i
, e
:= strconv
.ParseInt(s
, 10, 32)
139 log
.Printf("v1:%v v2:%v\n", "GORNSTR conversion error", e
)
141 gorstr
= int(i
) // The fixed length to use
145 func runSends(gr
int, qn
int) {
147 qns
:= fmt
.Sprintf("%d", qn
)
148 qname
:= sngecomm
.Dest() + "." + qns
149 sh
:= stompngo
.Headers
{"destination", qname
}
150 ll
.Printf("%stag:%s connsess:%s destination dest:%s\n",
151 exampid
, tag
, conn
.Session(),
153 if senv
.Persistent() {
154 sh
= sh
.Add("persistent", "true")
156 sh
= sh
.Add(MNHDR
, "0")
157 mnhnum
:= sh
.Index(MNHDR
)
158 ll
.Printf("%stag:%s connsess:%s send headers:%v\n",
159 exampid
, tag
, conn
.Session(),
161 for i
:= 1; i
<= senv
.Nmsgs(); i
++ {
162 is
:= fmt
.Sprintf("%d", i
) // Next message number
163 sh
[mnhnum
+1] = is
// Put message number in headers
165 ll
.Printf("%stag:%s connsess:%s main_sending hdrs:%v\n",
166 exampid
, tag
, conn
.Session(),
169 // Handle fixed or variable message length
172 err
= conn
.SendBytes(sh
, msf
)
175 // ostr := string(sngecomm.Partial())
176 // err = conn.Send(sh, ostr)
177 oby
:= sngecomm
.Partial()
178 err
= conn
.SendBytes(sh
, oby
)
182 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
183 exampid
, tag
, conn
.Session(),
184 err
.Error()) // Handle this ......
186 ll
.Printf("%stag:%s connsess:%s main_send_complete msfl:~%t~len:%d\n",
187 exampid
, tag
, conn
.Session(),
190 // Handle sleep options
193 // Fixed time to sleep
194 ll
.Printf("%stag:%s connsess:%s main_fixed sleep:~%v\n",
195 exampid
, tag
, conn
.Session(), gorslfx
)
198 // Variable time to sleep
199 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
200 ll
.Printf("%stag:%s connsess:%s main_rand sleep:~%v\n",
201 exampid
, tag
, conn
.Session(), dt
)
206 wg
.Done() // signal a goroutine completion
209 // Connect to a STOMP broker, publish some messages and disconnect.
212 if sngecomm
.Pprof() {
213 if sngecomm
.Cpuprof() != "" {
214 f
, err
:= os
.Create(sngecomm
.Cpuprof())
216 log
.Fatal("could not create CPU profile: ", err
)
218 if err
:= pprof
.StartCPUProfile(f
); err
!= nil {
219 log
.Fatal("could not start CPU profile: ", err
)
221 defer pprof
.StopCPUProfile()
227 // Standard example connect sequence
228 n
, conn
, e
= sngecomm
.CommonConnect(exampid
, tag
, ll
)
230 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
231 exampid
, tag
, sngecomm
.Lcs
,
232 e
.Error()) // Handle this ......
236 for i
:= gorstr
; i
<= gorstr
+ngor
-1; i
++ {
239 if nqs
> 1 && rqn
> nqs
{
246 // Standard example disconnect sequence
247 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, tag
, ll
)
249 ll
.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
250 exampid
, tag
, conn
.Session(),
251 e
.Error()) // Handle this ......
254 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
255 exampid
, tag
, conn
.Session(),
258 if sngecomm
.Pprof() {
259 if sngecomm
.Memprof() != "" {
260 f
, err
:= os
.Create(sngecomm
.Memprof())
262 log
.Fatal("could not create memory profile: ", err
)
264 runtime
.GC() // get up-to-date statistics
265 if err
:= pprof
.WriteHeapProfile(f
); err
!= nil {
266 log
.Fatal("could not write memory profile: ", err
)