2 // Copyright © 2011-2016 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Outputs to demonstrate different broker's algorithms for balancing messages across
24 multiple subscriptions to the same queue. (One go routine per subscription). In
25 this example all subscriptions share the same connection/session. The actual
26 results can / will likely be slightly surprising. YMMV.
28 Prime the queue for this demonstration using publish.go.
32 # Prime a queue with messages:
33 STOMP_PORT=61613 STOMP_NMSGS=10 go run publish.go
34 # Review ActiveMQ balancing characteristics. Note:
35 # this will eventually block, and the program will have to be
37 STOMP_PORT=61613 STOMP_ACKMODE="client-individual" go run recv_mds.go
39 # Prime a queue with messages again:
40 STOMP_PORT=62613 STOMP_NMSGS=10 go run publish.go
41 # Review Apollo balancing characteristics. Note:
42 # this will eventually block, and the program will have to be
44 STOMP_PORT=62613 STOMP_ACKMODE="client-individual" go run recv_mds.go
56 "github.com/gmallard/stompngo"
57 // senv methods could be used in general by stompngo clients.
58 "github.com/gmallard/stompngo/senv"
59 // sngecomm methods are used specifically for these example clients.
60 "github.com/gmallard/stompngo_examples/sngecomm"
64 exampid
= "recv_mds: "
65 ns
= 4 // Number of subscriptions
66 n net
.Conn
// Network Connection
67 conn
*stompngo
.Connection
// Stomp Connection
68 ackMode
string = "auto" // ackMode control
70 ll
= log
.New(os
.Stdout
, "EMDS ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
73 func recv(conn
*stompngo
.Connection
, s
int) {
74 ll
.Printf("%s v1:%v v2:%v v3:%v\n", exampid
, "receiver", s
, "starts")
76 id
:= stompngo
.Uuid() // Use package convenience function for unique ID
78 ackMode
= sngecomm
.AckMode() // get ack mode
80 pbc
:= sngecomm
.Pbc() // Print byte count
82 sc
:= sngecomm
.HandleSubscribe(conn
, d
, id
, ackMode
)
85 var md stompngo
.MessageData
88 case md
= <-sc
: // Read a messagedata struct, with a MESSAGE frame
89 case md
= <-conn
.MessageData
: // Read a messagedata struct, with a ERROR/RECEIPT frame
90 // Unexpected here in this example.
91 ll
.Fatalf("%s v1:%v\n", exampid
, md
) // Handle this
98 ll
.Printf("%s v1:%v v2:%v v3:%v v4:%v\n", exampid
, "subnumber", s
, id
, mc
)
101 if len(md
.Message
.Body
) < maxlen
{
102 maxlen
= len(md
.Message
.Body
)
104 ss
:= string(md
.Message
.Body
[0:maxlen
])
105 ll
.Printf("Payload: %s\n", ss
) // Data payload
108 // time.Sleep(3 * time.Second) // A very arbitrary number
109 // time.Sleep(500 * time.Millisecond) // A very arbitrary number
111 time
.Sleep(1500 * time
.Millisecond
) // A very arbitrary number
113 if ackMode
!= "auto" {
114 sngecomm
.HandleAck(conn
, md
.Message
.Headers
, id
)
115 ll
.Printf("%s v1:%v\n", exampid
, "ACK_complete_...")
121 // Connect to a STOMP broker, receive and ackMode some messages.
122 // Disconnect never occurs, kill via ^C.
124 ll
.Printf("%s v1:%v\n", exampid
, "starts ...")
126 // Set up the connection.
127 h
, p
:= senv
.HostAndPort() //
128 hap
:= net
.JoinHostPort(h
, p
)
129 n
, e
:= net
.Dial("tcp", hap
)
131 ll
.Fatalf("%s %s\n", exampid
, e
.Error()) // Handle this ......
133 ll
.Printf("%s v1:%v v2:%v\n", exampid
, "dial complete ...", hap
)
134 ch
:= sngecomm
.ConnectHeaders()
135 conn
, e
= stompngo
.Connect(n
, ch
)
137 ll
.Fatalf("%s %s\n", exampid
, e
.Error()) // Handle this ......
139 ll
.Printf("%s v1:%v v2:%v\n", exampid
, "stomp connect complete ...", conn
.Protocol())
141 for i
:= 1; i
<= ns
; i
++ {
144 ll
.Printf("%s v1:%v v2:%v\n", exampid
, ns
, "receivers started ...")
146 select {} // This will never complete, use ^C to cancel