2 // Copyright © 2011-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Outputs to demonstrate different broker's algorithms for balancing messages across
24 multiple subscriptions to the same queue. (One go routine per subscription). In
25 this example all subscriptions share the same connection/session. The actual
26 results can / will likely be slightly surprising. YMMV.
28 Prime the queue for this demonstration using publish.go.
32 # Prime a queue with messages:
33 STOMP_PORT=61613 STOMP_NMSGS=10 go run publish.go
34 # Review ActiveMQ balancing characteristics. Note:
35 # this will eventually block, and the program will have to be
37 STOMP_PORT=61613 STOMP_ACKMODE="client-individual" go run recv_mds.go
39 # Prime a queue with messages again:
40 STOMP_PORT=62613 STOMP_NMSGS=10 go run publish.go
41 # Review Apollo balancing characteristics. Note:
42 # this will eventually block, and the program will have to be
44 STOMP_PORT=62613 STOMP_ACKMODE="client-individual" go run recv_mds.go
56 "github.com/gmallard/stompngo"
57 // senv methods could be used in general by stompngo clients.
59 // sngecomm methods are used specifically for these example clients.
60 "github.com/gmallard/stompngo_examples/sngecomm"
64 exampid
= "recv_mds: "
65 ns
= 4 // Number of subscriptions
66 n net
.Conn
// Network Connection
67 conn
*stompngo
.Connection
// Stomp Connection
68 ackMode
string = "auto" // ackMode control
70 ll
= log
.New(os
.Stdout
, "EMDS ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
75 func recv(conn
*stompngo
.Connection
, s
int) {
78 ll
.Printf("%stag:%s connsess:%s receiver_starts s:%d\n",
79 exampid
, ltag
, conn
.Session(),
83 id
:= stompngo
.Uuid() // Use package convenience function for unique ID
85 ackMode
= sngecomm
.AckMode() // get ack mode
87 pbc
:= sngecomm
.Pbc() // Print byte count
89 sc
:= sngecomm
.HandleSubscribe(conn
, d
, id
, ackMode
)
92 var md stompngo
.MessageData
95 case md
= <-sc
: // Read a messagedata struct, with a MESSAGE frame
96 case md
= <-conn
.MessageData
: // Read a messagedata struct, with a ERROR/RECEIPT frame
97 // Frames RECEIPT or ERROR not expected here
98 ll
.Fatalf("%stag:%s connsess:%s bad_frame md:%v",
99 exampid
, ltag
, conn
.Session(),
100 md
) // Handle this ......
105 ll
.Fatalf("%stag:%s connsess:%s error_read error:%v",
106 exampid
, ltag
, conn
.Session(),
107 md
.Error
) // Handle this ......
109 ll
.Printf("%stag:%s connsess:%s received_message s:%d id:%s mc:%d hdrs:%v\n",
110 exampid
, ltag
, conn
.Session(),
111 s
, id
, mc
, md
.Message
.Headers
)
114 if len(md
.Message
.Body
) < maxlen
{
115 maxlen
= len(md
.Message
.Body
)
117 ss
:= string(md
.Message
.Body
[0:maxlen
])
118 ll
.Printf("%stag:%s connsess:%s payload body:%s\n",
119 exampid
, tag
, conn
.Session(),
123 // time.Sleep(3 * time.Second) // A very arbitrary number
124 // time.Sleep(500 * time.Millisecond) // A very arbitrary number
126 time
.Sleep(1500 * time
.Millisecond
) // A very arbitrary number
128 if ackMode
!= "auto" {
129 sngecomm
.HandleAck(conn
, md
.Message
.Headers
, id
)
130 ll
.Printf("%stag:%s connsess:%s ack_complete s:%d id:%s mc:%d\n",
131 exampid
, ltag
, conn
.Session(),
138 // Connect to a STOMP broker, receive and ackMode some messages.
139 // Disconnect never occurs, kill via ^C.
142 // Standard example connect sequence
143 _
, conn
, e
:= sngecomm
.CommonConnect(exampid
, tag
, ll
)
145 ll
.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
146 exampid
, tag
, sngecomm
.Lcs
,
147 e
.Error()) // Handle this ......
150 for i
:= 1; i
<= ns
; i
++ {
153 ll
.Printf("%stag:%s connsess:%s receivers_started\n",
154 exampid
, tag
, conn
.Session())
156 select {} // This will never complete, use ^C to cancel