2 // Copyright © 2011-2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Package sngecomm provides common functionality used in the stompngo_examples
35 "github.com/gmallard/stompngo"
39 host
= "localhost" // default host
40 port
= "61613" // default port
41 protocol
= "1.1" // Default protocol level
42 login
= "guest" // Default login
43 passcode
= "guest" // Default passcode
46 nmsgs
= 1 // Default number of messages to send
47 dest
= "/queue/snge.common.queue" // Default destination
48 nqs
= 1 // Default number of queues for multi-queue demo(s)
49 scc
= 1 // Default subscribe channel capacity
50 mdml
= 1024 * 32 // Message data max length of variable message, 32K
51 md
= make([]byte, 1) // Additional message data, primed during init()
52 rc
= 1 // Receiver connection count, srmgor_1smrconn
53 pbc
= 64 // Number of bytes to print (used in some
54 // // examples that receive).
56 sendFact
float64 = 1.0 // Send sleep time factor
57 recvFact
float64 = 1.0 // Receive sleep time factor
59 conn2Buffer
int = -1 // 2 connection buffer. < 0 means use queue size.
61 ackMode
= "auto" // The default ack mode
63 pprof
= false // Do not do profiling
64 hbparms
= "" // No heartbeats
69 p
:= "_123456789ABCDEF"
72 md
= bytes
.Repeat(b
, c
) // A long string
74 if s
:= os
.Getenv("STOMP_SENDFACT"); s
!= "" {
75 f
, e
:= strconv
.ParseFloat(s
, 64)
81 if s
:= os
.Getenv("STOMP_RECVFACT"); s
!= "" {
82 f
, e
:= strconv
.ParseFloat(s
, 64)
88 if s
:= os
.Getenv("STOMP_CONN2BUFFER"); s
!= "" {
89 s
, e
:= strconv
.ParseInt(s
, 10, 32)
95 if am
:= os
.Getenv("STOMP_ACKMODE"); am
!= "" {
96 if am
== "auto" || am
== "client" || am
== "client-individual" {
101 if am
:= os
.Getenv("STOMP_PPROF"); am
!= "" {
105 if s
:= os
.Getenv("STOMP_MDML"); s
!= "" {
106 i
, e
:= strconv
.ParseInt(s
, 10, 32)
112 if s
:= os
.Getenv("STOMP_RECVCONNS"); s
!= "" {
113 i
, e
:= strconv
.ParseInt(s
, 10, 32)
119 if hbp
:= os
.Getenv("STOMP_HBPARMS"); hbp
!= "" {
123 if s
:= os
.Getenv("STOMP_PBC"); s
!= "" {
124 i
, e
:= strconv
.ParseInt(s
, 10, 32)
131 // Receiver connection count
132 func Recvconns() int {
136 // Max Data Message Length
141 // Use profiling or not
146 // ACK mode for those examples that use it.
147 func AckMode() string {
151 // 2 Connection Buffer Size
152 func Conn2Buffer() int {
156 // Timestamp example ids
157 func ExampIdNow(s
string) string {
158 return time
.Now().String() + " " + s
161 // Get Send Sleep Factor
162 func SendFactor() float64 {
166 // Get Recv Sleep Factor
167 func RecvFactor() float64 {
171 // Get partial string, random length
172 func Partial() []byte {
173 r
:= int(ValueBetween(1, int64(mdml
-1), 1.0))
177 // Override default protocol level
178 func Protocol() string {
179 p
:= os
.Getenv("STOMP_PROTOCOL")
191 // Override Host and port for Dial if requested.
192 func HostAndPort() (string, string) {
193 he
:= os
.Getenv("STOMP_HOST")
197 pe
:= os
.Getenv("STOMP_PORT")
205 func Login() string {
206 l
:= os
.Getenv("STOMP_LOGIN")
217 func Passcode() string {
218 p
:= os
.Getenv("STOMP_PASSCODE")
228 // Provide connect headers
229 func ConnectHeaders() stompngo
.Headers
{
230 h
:= stompngo
.Headers
{}
233 h
= h
.Add("login", l
)
237 h
= h
.Add("passcode", pc
)
241 if p
!= stompngo
.SPL_10
{ // 1.1 and 1.2
242 h
= h
.Add("accept-version", p
).Add("host", Vhost())
247 h
= h
.Add("heart-beat", hb
)
253 // Number of messages to send
255 c
:= os
.Getenv("STOMP_NMSGS")
259 n
, e
:= strconv
.ParseInt(c
, 10, 0)
261 fmt
.Printf("NMSGS Conversion error: %v\n", e
)
267 // Number of queues to use
269 c
:= os
.Getenv("STOMP_NQS")
273 n
, e
:= strconv
.ParseInt(c
, 10, 0)
275 fmt
.Printf("NQS Conversion error: %v\n", e
)
281 // Subscribe Channel Capacity
282 func SubChanCap() int {
283 c
:= os
.Getenv("STOMP_SUBCHANCAP")
287 n
, e
:= strconv
.ParseInt(c
, 10, 0)
289 fmt
.Printf("SUBCHANCAP Conversion error: %v\n", e
)
295 // Destination to send to
297 d
:= os
.Getenv("STOMP_DEST")
304 // Does receive wait to simulate message processing
305 func RecvWait() bool {
306 f
:= os
.Getenv("STOMP_NORECVW")
313 // Does send wait to simulate message building
314 func SendWait() bool {
315 f
:= os
.Getenv("STOMP_NOSENDW")
322 // True if persistent messages are desired.
323 func Persistent() bool {
324 f
:= os
.Getenv("STOMP_PERSISTENT")
331 // True if max procs are to be set
332 func SetMAXPROCS() bool {
333 f
:= os
.Getenv("STOMP_SETMAXPROCS")
340 // Virtual Host Name to use
341 func Vhost() string {
342 d
:= os
.Getenv("STOMP_VHOST")
350 func HbParms() string {
354 // Show connection metrics.
355 func ShowStats(exampid
, tag
string, conn
*stompngo
.Connection
) {
356 r
:= conn
.FramesRead()
357 br
:= conn
.BytesRead()
358 w
:= conn
.FramesWritten()
359 bw
:= conn
.BytesWritten()
360 s
:= conn
.Running().Seconds()
361 n
:= conn
.Running().Nanoseconds()
362 fmt
.Println(ExampIdNow(exampid
), tag
, "frame read count", r
)
363 fmt
.Println(ExampIdNow(exampid
), tag
, "bytes read", br
)
364 fmt
.Println(ExampIdNow(exampid
), tag
, "frame write count", w
)
365 fmt
.Println(ExampIdNow(exampid
), tag
, "bytes written", bw
)
366 fmt
.Println(ExampIdNow(exampid
), tag
, "current duration(ns)", n
)
367 fmt
.Printf("%s %s %s %20.6f\n", ExampIdNow(exampid
), tag
, "current duration(sec)", s
)
368 fmt
.Printf("%s %s %s %20.6f\n", ExampIdNow(exampid
), tag
, "frame reads/sec", float64(r
)/s
)
369 fmt
.Printf("%s %s %s %20.6f\n", ExampIdNow(exampid
), tag
, "bytes read/sec", float64(br
)/s
)
370 fmt
.Printf("%s %s %s %20.6f\n", ExampIdNow(exampid
), tag
, "frame writes/sec", float64(w
)/s
)
371 fmt
.Printf("%s %s %s %20.6f\n", ExampIdNow(exampid
), tag
, "bytes written/sec", float64(bw
)/s
)
374 // Get a value between min amd max
375 func ValueBetween(min
, max
int64, fact
float64) int64 {
376 rt
, _
:= rand
.Int(rand
.Reader
, big
.NewInt(max
-min
)) // Ignore errors here
377 return int64(fact
* float64(min
+rt
.Int64()))
380 // Dump a TLS Configuration Struct
381 func DumpTLSConfig(exampid
string, c
*tls
.Config
, n
*tls
.Conn
) {
383 fmt
.Printf("%s Rand: %v\n", ExampIdNow(exampid
), c
.Rand
)
384 fmt
.Printf("%s Time: %v\n", ExampIdNow(exampid
), c
.Time
)
385 fmt
.Printf("%s Certificates: %v\n", ExampIdNow(exampid
), c
.Certificates
)
386 fmt
.Printf("%s NameToCertificate: %v\n", ExampIdNow(exampid
), c
.NameToCertificate
)
387 fmt
.Printf("%s RootCAs: %v\n", ExampIdNow(exampid
), c
.RootCAs
)
388 fmt
.Printf("%s NextProtos: %v\n", ExampIdNow(exampid
), c
.NextProtos
)
389 fmt
.Printf("%s ServerName: %v\n", ExampIdNow(exampid
), c
.ServerName
)
390 fmt
.Printf("%s ClientAuth: %v\n", ExampIdNow(exampid
), c
.ClientAuth
)
391 fmt
.Printf("%s ClientCAs: %v\n", ExampIdNow(exampid
), c
.ClientCAs
)
392 fmt
.Printf("%s CipherSuites: %v\n", ExampIdNow(exampid
), c
.CipherSuites
)
393 fmt
.Printf("%s PreferServerCipherSuites: %v\n", ExampIdNow(exampid
), c
.PreferServerCipherSuites
)
394 fmt
.Printf("%s SessionTicketsDisabled: %v\n", ExampIdNow(exampid
), c
.SessionTicketsDisabled
)
395 fmt
.Printf("%s SessionTicketKey: %v\n", ExampIdNow(exampid
), c
.SessionTicketKey
)
397 // Idea Embellished From:
398 // https://groups.google.com/forum/#!topic/golang-nuts/TMNdOxugbTY
399 cs
:= n
.ConnectionState()
400 fmt
.Println(ExampIdNow(exampid
), "HandshakeComplete:", cs
.HandshakeComplete
)
401 fmt
.Println(ExampIdNow(exampid
), "DidResume:", cs
.DidResume
)
402 fmt
.Printf("%s %s %d(0x%X)\n", ExampIdNow(exampid
), "CipherSuite:", cs
.CipherSuite
, cs
.CipherSuite
)
403 fmt
.Println(ExampIdNow(exampid
), "NegotiatedProtocol:", cs
.NegotiatedProtocol
)
404 fmt
.Println(ExampIdNow(exampid
), "NegotiatedProtocolIsMutual:", cs
.NegotiatedProtocolIsMutual
)
405 fmt
.Println(ExampIdNow(exampid
), "ServerName:", cs
.ServerName
)
406 // Portions of any Peer Certificates present
407 certs
:= cs
.PeerCertificates
408 if certs
== nil ||
len(certs
) < 1 {
409 fmt
.Println("Could not get server's certificate from the TLS connection.")
413 fmt
.Println(ExampIdNow(exampid
), "Server Certs:")
414 for i
, cert
:= range certs
{
415 fmt
.Printf("Certificate chain: %d\n", i
)
416 fmt
.Printf("Common Name:%s \n", cert
.Subject
.CommonName
)
418 fmt
.Printf("Subject Alternative Names (DNSNames):\n")
419 for idx
, dnsn
:= range cert
.DNSNames
{
420 fmt
.Printf("\tNumber: %d, DNS Name: %s\n", idx
+1, dnsn
)
423 fmt
.Printf("Subject Alternative Names (Emailaddresses):\n")
424 for idx
, enn
:= range cert
.EmailAddresses
{
425 fmt
.Printf("\tNumber: %d, DNS Name: %s\n", idx
+1, enn
)
428 fmt
.Printf("Subject Alternative Names (IPAddresses):\n")
429 for idx
, ipadn
:= range cert
.IPAddresses
{
430 fmt
.Printf("\tNumber: %d, DNS Name: %v\n", idx
+1, ipadn
)
433 fmt
.Printf("Valid Not Before: %s\n", cert
.NotBefore
.Local().String())
434 fmt
.Printf("Valid Not After: %s\n", cert
.NotAfter
.Local().String())
435 fmt
.Println("" + strings
.Repeat("=", 80) + "\n")
441 // Handle a subscribe for the different protocol levels.
442 func Subscribe(c
*stompngo
.Connection
, d
, i
, a
string) <-chan stompngo
.MessageData
{
443 h
:= stompngo
.Headers
{"destination", d
, "ack", a
}
445 switch c
.Protocol() {
446 case stompngo
.SPL_12
:
447 // Add required id header
449 case stompngo
.SPL_11
:
450 // Add required id header
452 case stompngo
.SPL_10
:
453 // Nothing else to do here
455 log
.Fatalln("subscribe invalid protocol level, should not happen")
458 r
, e
:= c
.Subscribe(h
)
460 log
.Fatalln("subscribe failed", e
)
465 // Handle a unsubscribe for the different protocol levels.
466 func Unsubscribe(c
*stompngo
.Connection
, d
, i
string) {
467 h
:= stompngo
.Headers
{}
469 switch c
.Protocol() {
470 case stompngo
.SPL_12
:
472 case stompngo
.SPL_11
:
474 case stompngo
.SPL_10
:
475 h
= h
.Add("destination", d
)
477 log
.Fatalln("unsubscribe invalid protocol level, should not happen")
479 e
:= c
.Unsubscribe(h
)
481 log
.Fatalln("unsubscribe failed", e
)
486 // Handle ACKs for the different protocol levels.
487 func Ack(c
*stompngo
.Connection
, h stompngo
.Headers
, id
string) {
488 ah
:= stompngo
.Headers
{}
490 switch c
.Protocol() {
491 case stompngo
.SPL_12
:
492 ah
= ah
.Add("id", h
.Value("ack"))
493 case stompngo
.SPL_11
:
494 ah
= ah
.Add("message-id", h
.Value("message-id")).Add("subscription", id
)
495 case stompngo
.SPL_10
:
496 ah
= ah
.Add("message-id", h
.Value("message-id"))
498 log
.Fatalln("unsubscribe invalid protocol level, should not happen")
502 log
.Fatalln("ack failed", e
, c
.Protocol())
507 func ShowRunParms(exampid
string) {
508 fmt
.Println(ExampIdNow(exampid
), "HOST", os
.Getenv("STOMP_HOST"), "alt", host
)
509 fmt
.Println(ExampIdNow(exampid
), "PORT", os
.Getenv("STOMP_PORT"), "alt", port
)
510 fmt
.Println(ExampIdNow(exampid
), "PROTOCOL", Protocol())
511 fmt
.Println(ExampIdNow(exampid
), "VHOST", Vhost())
512 fmt
.Println(ExampIdNow(exampid
), "NQS", Nqs())
513 fmt
.Println(ExampIdNow(exampid
), "NMSGS", Nmsgs())
514 fmt
.Println(ExampIdNow(exampid
), "SUBCHANCAP", SubChanCap())
515 fmt
.Println(ExampIdNow(exampid
), "RECVFACT", RecvFactor())
516 fmt
.Println(ExampIdNow(exampid
), "SENDFACT", SendFactor())
517 fmt
.Println(ExampIdNow(exampid
), "CON2BUFFER", Conn2Buffer())
518 fmt
.Println(ExampIdNow(exampid
), "ACKMODE", AckMode())
519 fmt
.Println(ExampIdNow(exampid
), "RECVCONNS", Recvconns())