2 // Copyright © 2013-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Publish messages to a STOMP broker.
22 # Publish to a broker with all defaults:
27 # Virtual Host is "localhost"
31 # Publish to a broker using STOMP protocol level 1.0:
32 STOMP_PROTOCOL=1.0 go run publish.go
34 # Publish to a broker using a custom host and port:
35 STOMP_HOST=tjjackson STOMP_PORT=62613 go run publish.go
37 # Publish to a broker using a custom port and virtual host:
38 STOMP_PORT=41613 STOMP_VHOST="/" go run publish.go
40 # Publish to a broker using a custom login and passcode:
41 STOMP_LOGIN="userid" STOMP_PASSCODE="t0ps3cr3t" go run publish.go
43 # Important environment variables for this program are:
45 # STOMP_NGORS - the number of go routines used to write to the
48 # STOMP_NMSGS - the number of messages each go routine will write.
50 # STOMP_NQS - The number of queues to write messages to. If this
51 # variable is absent, the value defaults to the value specified
52 # for STOMP_NGORS. If this value is specified, all go routines
53 # are multi-plexed across this number of queues.
69 "github.com/gmallard/stompngo"
70 // senv methods could be used in general by stompngo clients.
71 "github.com/gmallard/stompngo/senv"
72 // sngecomm methods are used specifically for these example clients.
73 "github.com/gmallard/stompngo_examples/sngecomm"
78 ll
= log
.New(os
.Stdout
, "EPUB ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
81 conn
*stompngo
.Connection
86 // MNHDR is the message number, in message headers
88 gorstr
= 1 // Starting destination number
90 msfl
= true // Fixed message length
91 mslen
= 1024 // The fixed length
96 gorslfx
= time
.Duration(250 * time
.Millisecond
)
99 max
int64 = 1e9
// Max stagger time (nanoseconds)
100 min
= max
/ 10 // Min stagger time (nanoseconds)
107 ngor
= sngecomm
.Ngors()
110 // Options around message length:
112 // 2) randomly variable length
113 if os
.Getenv("STOMP_VARMSL") != "" {
114 msfl
= false // Use randomly variable message lengths
117 if s
:= os
.Getenv("STOMP_FXMSLEN"); s
!= "" {
118 i
, e
:= strconv
.ParseInt(s
, 10, 32)
120 log
.Printf("v1:%v v2:%v\n", "FXMSLEN conversion error", e
)
122 mslen
= int(i
) // The fixed length to use
125 msf
= sngecomm
.PartialSubstr(mslen
)
128 // Options controlling sleeps between message sends. Options are:
130 // 2) Sleep a fixed amount of time
131 // 3) Sleep a random variable amount of time
132 if os
.Getenv("STOMP_DOSLEEP") != "" {
133 gorsl
= true // Do sleep
135 if os
.Getenv("STOMP_FIXSLEEP") != "" {
136 gorslfb
= true // Do a fixed length sleep
140 if s
:= os
.Getenv("STOMP_SLEEPMS"); s
!= "" { // Fixed length milliseconds
141 mss
:= fmt
.Sprintf("%s", s
) + "ms"
142 gorslfx
, err
= time
.ParseDuration(mss
)
144 log
.Printf("v1:%v v2:%v v3:%v\n", "ParseDuration conversion error", mss
, e
)
149 if s
:= os
.Getenv("STOMP_GORNSTR"); s
!= "" {
150 i
, e
:= strconv
.ParseInt(s
, 10, 32)
152 log
.Printf("v1:%v v2:%v\n", "GORNSTR conversion error", e
)
154 gorstr
= int(i
) // The fixed length to use
158 func runSends(gr
int, qn
int) {
160 qns
:= fmt
.Sprintf("%d", qn
)
161 qname
:= sngecomm
.Dest() + "." + qns
162 sh
:= stompngo
.Headers
{"destination", qname
}
163 ll
.Printf("%stag:%s connsess:%s destination dest:%s BEGIN_runSends %d\n",
164 exampid
, tag
, conn
.Session(),
166 if senv
.Persistent() {
167 sh
= sh
.Add("persistent", "true")
169 sh
= sh
.Add(MNHDR
, "0")
170 mnhnum
:= sh
.Index(MNHDR
)
171 ll
.Printf("%stag:%s connsess:%s send headers:%v\n",
172 exampid
, tag
, conn
.Session(),
174 for i
:= 1; i
<= senv
.Nmsgs(); i
++ {
175 is
:= fmt
.Sprintf("%d", i
) // Next message number
176 sh
[mnhnum
+1] = is
// Put message number in headers
178 ll
.Printf("%stag:%s connsess:%s main_sending gr:%d hdrs:%v\n",
179 exampid
, tag
, conn
.Session(),
182 // Handle fixed or variable message length
185 err
= conn
.SendBytes(sh
, msf
)
188 // ostr := string(sngecomm.Partial())
189 // err = conn.Send(sh, ostr)
190 oby
:= sngecomm
.Partial()
191 err
= conn
.SendBytes(sh
, oby
)
195 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
196 exampid
, tag
, conn
.Session(),
197 err
.Error()) // Handle this ......
199 ll
.Printf("%stag:%s connsess:%s main_send_complete gr:%d msfl:~%t~len:%d\n",
200 exampid
, tag
, conn
.Session(),
203 // Handle sleep options
206 // Fixed time to sleep
207 ll
.Printf("%stag:%s connsess:%s gr:%d main_fixed sleep:~%v\n",
208 exampid
, tag
, conn
.Session(), gr
, gorslfx
)
211 // Variable time to sleep
212 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
213 ll
.Printf("%stag:%s connsess:%s gr:%d main_rand sleep:~%v\n",
214 exampid
, tag
, conn
.Session(), gr
, dt
)
219 if sngecomm
.UseEOF() {
220 sh
:= stompngo
.Headers
{"destination", qname
}
221 _
= conn
.Send(sh
, sngecomm
.EOFMsg
)
222 ll
.Printf("%stag:%s connsess:%s gr:%d sent EOF [%s]\n",
223 exampid
, tag
, conn
.Session(), gr
, sngecomm
.EOFMsg
)
225 wg
.Done() // signal a goroutine completion
228 // Connect to a STOMP broker, publish some messages and disconnect.
231 if sngecomm
.Pprof() {
232 if sngecomm
.Cpuprof() != "" {
233 ll
.Printf("%stag:%s connsess:%s CPUPROF %s\n",
234 exampid
, tag
, sngecomm
.Lcs
, sngecomm
.Cpuprof())
235 f
, err
:= os
.Create(sngecomm
.Cpuprof())
237 log
.Fatal("could not create CPU profile: ", err
)
239 if err
:= pprof
.StartCPUProfile(f
); err
!= nil {
240 log
.Fatal("could not start CPU profile: ", err
)
242 defer pprof
.StopCPUProfile()
248 // Standard example connect sequence
249 n
, conn
, e
= sngecomm
.CommonConnect(exampid
, tag
, ll
)
251 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
252 exampid
, tag
, sngecomm
.Lcs
,
253 e
.Error()) // Handle this ......
256 ll
.Printf("%stag:%s connsess:%s START gorstr:%d ngor:%d nqs:%d nmsgs:%d\n",
257 exampid
, tag
, conn
.Session(), gorstr
, ngor
, nqs
, senv
.Nmsgs())
260 for i
:= gorstr
; i
<= gorstr
+ngor
-1; i
++ {
263 if nqs
> 1 && rqn
> nqs
{
270 // Standard example disconnect sequence
271 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, tag
, ll
)
273 ll
.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
274 exampid
, tag
, conn
.Session(),
275 e
.Error()) // Handle this ......
278 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
279 exampid
, tag
, conn
.Session(),
282 if sngecomm
.Pprof() {
283 if sngecomm
.Memprof() != "" {
284 ll
.Printf("%stag:%s connsess:%s MEMPROF %s\n",
285 exampid
, tag
, conn
.Session(), sngecomm
.Memprof())
286 f
, err
:= os
.Create(sngecomm
.Memprof())
288 log
.Fatal("could not create memory profile: ", err
)
290 runtime
.GC() // get up-to-date statistics
291 if err
:= pprof
.WriteHeapProfile(f
); err
!= nil {
292 log
.Fatal("could not write memory profile: ", err
)