2 // Copyright © 2011-2013 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Output can demonstrate different broker's algorithms for balancing messages across
24 multiple subscriptions to the same queue. In this example all subscriptions
25 share the same connection/session.
27 Prime the queue for this demonstration using publish.go.
31 # Prime a queue with messages:
32 STOMP_PORT=61613 STOMP_NMSGS=10 go run publish.go
34 # Review ActiveMQ balancing characteristics:
35 STOMP_PORT=61613 go run recv_mds.go
37 # Prime a queue with messages again:
38 STOMP_PORT=62613 STOMP_NMSGS=10 go run publish.go
40 # Review Apollo balancing characteristics:
41 STOMP_PORT=62613 go run recv_mds.go
48 "github.com/gmallard/stompngo"
49 "github.com/gmallard/stompngo_examples/sngecomm"
58 exampid
= "recv_mds: "
59 ns
= 4 // Number of subscriptions
61 n net
.Conn
// Network Connection
62 conn
*stompngo
.Connection
// Stomp Connection
63 ack
bool = false // ack mode control
69 id
:= stompngo
.Uuid() // Use package convenience function for unique ID
71 var r
<-chan stompngo
.MessageData
73 r
= sngecomm
.Subscribe(conn
, d
, id
, "client-individual")
75 r
= sngecomm
.Subscribe(conn
, d
, id
, "auto")
79 d
:= <-r
// Read a messagedata struct
83 m
:= d
.Message
.BodyString()
84 fmt
.Println(exampid
, "subnumber", s
, m
, id
)
86 time
.Sleep(1 * time
.Second
)
88 sngecomm
.Ack(conn
, d
.Message
.Headers
, id
)
89 fmt
.Println(exampid
+ "ACK complete ...")
93 wgrecv
.Done() // Never get here, cancel via ^C
96 // Connect to a STOMP broker, receive and ack some messages.
97 // Disconnect never occurs, kill via ^C.
99 fmt
.Println(exampid
, "starts ...")
101 // Set up the connection.
102 h
, port
:= sngecomm
.HostAndPort() //
103 n
, e
:= net
.Dial("tcp", net
.JoinHostPort(h
, port
))
105 log
.Fatalln(e
) // Handle this ......
107 fmt
.Println(exampid
, "dial complete ...")
108 ch
:= sngecomm
.ConnectHeaders()
109 conn
, e
= stompngo
.Connect(n
, ch
)
111 log
.Fatalln(e
) // Handle this ......
113 fmt
.Println(exampid
, "stomp connect complete ...", conn
.Protocol())
115 wgrecv
.Add(ns
) // Number of subscriptions, hard coded in this demonstartion
116 for i
:= 1; i
<= ns
; i
++ {
119 fmt
.Println(exampid
, "receivers started ...")
121 wgrecv
.Wait() // This will never complete, use ^C to cancel