2 // Copyright © 2013-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Publish messages to a STOMP broker.
22 # Publish to a broker with all defaults:
27 # Virtual Host is "localhost"
31 # Publish to a broker using STOMP protocol level 1.0:
32 STOMP_PROTOCOL=1.0 go run publish.go
34 # Publish to a broker using a custom host and port:
35 STOMP_HOST=tjjackson STOMP_PORT=62613 go run publish.go
37 # Publish to a broker using a custom port and virtual host:
38 STOMP_PORT=41613 STOMP_VHOST="/" go run publish.go
40 # Publish to a broker using a custom login and passcode:
41 STOMP_LOGIN="userid" STOMP_PASSCODE="t0ps3cr3t" go run publish.go
43 # Important environment variables for this program are:
45 # STOMP_NGORS - the number of go routines used to write to the
48 # STOMP_NMSGS - the number of messages each go routine will write.
50 # STOMP_NQS - The number of queues to write messages to. If this
51 # variable is absent, the value defaults to the value specified
52 # for STOMP_NGORS. If this value is specified, all go routines
53 # are multi-plexed across this number of queues.
69 "github.com/gmallard/stompngo"
70 // senv methods could be used in general by stompngo clients.
71 "github.com/gmallard/stompngo/senv"
72 // sngecomm methods are used specifically for these example clients.
73 "github.com/gmallard/stompngo_examples/sngecomm"
78 ll
= log
.New(os
.Stdout
, "EPUB ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
81 conn
*stompngo
.Connection
86 // MNHDR is the message number, in message headers
88 gorstr
= 1 // Starting destination number
90 msfl
= true // Fixed message length
91 mslen
= 1024 // The fixed length
96 gorslfx
= time
.Duration(250 * time
.Millisecond
)
99 max
int64 = 1e9
// Max stagger time (nanoseconds)
100 min
= max
/ 10 // Min stagger time (nanoseconds)
107 ngor
= sngecomm
.Ngors()
110 // Options around message length:
112 // 2) randomly variable length
113 if os
.Getenv("STOMP_VARMSL") != "" {
114 msfl
= false // Use randomly variable message lengths
117 if s
:= os
.Getenv("STOMP_FXMSLEN"); s
!= "" {
118 i
, e
:= strconv
.ParseInt(s
, 10, 32)
120 log
.Printf("v1:%v v2:%v\n", "FXMSLEN conversion error", e
)
122 mslen
= int(i
) // The fixed length to use
125 msf
= sngecomm
.PartialSubstr(mslen
)
128 // Options controlling sleeps between message sends. Options are:
130 // 2) Sleep a fixed amount of time
131 // 3) Sleep a random variable amount of time
132 if os
.Getenv("STOMP_DOSLEEP") != "" {
133 gorsl
= true // Do sleep
135 if os
.Getenv("STOMP_FIXSLEEP") != "" {
136 gorslfb
= true // Do a fixed length sleep
140 if s
:= os
.Getenv("STOMP_SLEEPMS"); s
!= "" { // Fixed length milliseconds
141 mss
:= fmt
.Sprintf("%s", s
) + "ms"
142 gorslfx
, err
= time
.ParseDuration(mss
)
144 log
.Printf("v1:%v v2:%v v3:%v\n", "ParseDuration conversion error", mss
, e
)
148 // Option controlling destination numbering. Destinations are normally
149 // suffixed with a sequence number, starting at 1. This option allows
150 // that starting sequence number to be arbitrary.
151 if s
:= os
.Getenv("STOMP_GORNSTR"); s
!= "" {
152 i
, e
:= strconv
.ParseInt(s
, 10, 32)
154 log
.Printf("v1:%v v2:%v\n", "GORNSTR conversion error", e
)
156 gorstr
= int(i
) // The starting sequence number to use.
160 func runSends(gr
int, qn
int) {
162 qns
:= fmt
.Sprintf("%d", qn
)
163 qname
:= sngecomm
.Dest() + "." + qns
164 sh
:= stompngo
.Headers
{"destination", qname
}
165 ll
.Printf("%stag:%s connsess:%s destination dest:%s BEGIN_runSends %d\n",
166 exampid
, tag
, conn
.Session(),
168 if senv
.Persistent() {
169 sh
= sh
.Add("persistent", "true")
171 sh
= sh
.Add(MNHDR
, "0")
172 mnhnum
:= sh
.Index(MNHDR
)
173 ll
.Printf("%stag:%s connsess:%s send headers:%v\n",
174 exampid
, tag
, conn
.Session(),
176 for i
:= 1; i
<= senv
.Nmsgs(); i
++ {
177 is
:= fmt
.Sprintf("%d", i
) // Next message number
178 sh
[mnhnum
+1] = is
// Put message number in headers
180 ll
.Printf("%stag:%s connsess:%s main_sending gr:%d hdrs:%v\n",
181 exampid
, tag
, conn
.Session(),
184 // Handle fixed or variable message length
187 err
= conn
.SendBytes(sh
, msf
)
190 // ostr := string(sngecomm.Partial())
191 // err = conn.Send(sh, ostr)
192 oby
:= sngecomm
.Partial()
193 err
= conn
.SendBytes(sh
, oby
)
197 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
198 exampid
, tag
, conn
.Session(),
199 err
.Error()) // Handle this ......
201 ll
.Printf("%stag:%s connsess:%s main_send_complete gr:%d msfl:~%t~len:%d\n",
202 exampid
, tag
, conn
.Session(),
205 // Handle sleep options
208 // Fixed time to sleep
209 ll
.Printf("%stag:%s connsess:%s gr:%d main_fixed sleep:~%v\n",
210 exampid
, tag
, conn
.Session(), gr
, gorslfx
)
213 // Variable time to sleep
214 dt
:= time
.Duration(sngecomm
.ValueBetween(min
, max
, sf
))
215 ll
.Printf("%stag:%s connsess:%s gr:%d main_rand sleep:~%v\n",
216 exampid
, tag
, conn
.Session(), gr
, dt
)
221 if sngecomm
.UseEOF() {
222 sh
:= stompngo
.Headers
{"destination", qname
}
223 _
= conn
.Send(sh
, sngecomm
.EOFMsg
)
224 ll
.Printf("%stag:%s connsess:%s gr:%d sent EOF [%s]\n",
225 exampid
, tag
, conn
.Session(), gr
, sngecomm
.EOFMsg
)
227 wg
.Done() // signal a goroutine completion
230 // Connect to a STOMP broker, publish some messages and disconnect.
233 if sngecomm
.Pprof() {
234 if sngecomm
.Cpuprof() != "" {
235 ll
.Printf("%stag:%s connsess:%s CPUPROF %s\n",
236 exampid
, tag
, sngecomm
.Lcs
, sngecomm
.Cpuprof())
237 f
, err
:= os
.Create(sngecomm
.Cpuprof())
239 log
.Fatal("could not create CPU profile: ", err
)
241 if err
:= pprof
.StartCPUProfile(f
); err
!= nil {
242 log
.Fatal("could not start CPU profile: ", err
)
244 defer pprof
.StopCPUProfile()
250 // Standard example connect sequence
251 n
, conn
, e
= sngecomm
.CommonConnect(exampid
, tag
, ll
)
253 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
254 exampid
, tag
, sngecomm
.Lcs
,
255 e
.Error()) // Handle this ......
258 ll
.Printf("%stag:%s connsess:%s START gorstr:%d ngor:%d nqs:%d nmsgs:%d\n",
259 exampid
, tag
, conn
.Session(), gorstr
, ngor
, nqs
, senv
.Nmsgs())
262 for i
:= gorstr
; i
<= gorstr
+ngor
-1; i
++ {
265 if nqs
> 1 && rqn
> nqs
{
272 // Standard example disconnect sequence
273 e
= sngecomm
.CommonDisconnect(n
, conn
, exampid
, tag
, ll
)
275 ll
.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
276 exampid
, tag
, conn
.Session(),
277 e
.Error()) // Handle this ......
280 ll
.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
281 exampid
, tag
, conn
.Session(),
284 if sngecomm
.Pprof() {
285 if sngecomm
.Memprof() != "" {
286 ll
.Printf("%stag:%s connsess:%s MEMPROF %s\n",
287 exampid
, tag
, conn
.Session(), sngecomm
.Memprof())
288 f
, err
:= os
.Create(sngecomm
.Memprof())
290 log
.Fatal("could not create memory profile: ", err
)
292 runtime
.GC() // get up-to-date statistics
293 if err
:= pprof
.WriteHeapProfile(f
); err
!= nil {
294 log
.Fatal("could not write memory profile: ", err
)