Changes to publish.go:
[stompngo_examples.git] / recv_mds / recv_mds.go
blob0a048a0c7ed06e673537637804769ffae9f6fc03
1 //
2 // Copyright © 2011-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Outputs to demonstrate different broker's algorithms for balancing messages across
24 multiple subscriptions to the same queue. (One go routine per subscription). In
25 this example all subscriptions share the same connection/session. The actual
26 results can / will likely be slightly surprising. YMMV.
28 Prime the queue for this demonstration using publish.go.
30 Examples:
32 # Prime a queue with messages:
33 STOMP_PORT=61613 STOMP_NMSGS=10 go run publish.go
34 # Review ActiveMQ balancing characteristics. Note:
35 # this will eventually block, and the program will have to be
36 # forcibly stopped.
37 STOMP_PORT=61613 STOMP_ACKMODE="client-individual" go run recv_mds.go
39 # Prime a queue with messages again:
40 STOMP_PORT=62613 STOMP_NMSGS=10 go run publish.go
41 # Review Apollo balancing characteristics. Note:
42 # this will eventually block, and the program will have to be
43 # forcibly stopped.
44 STOMP_PORT=62613 STOMP_ACKMODE="client-individual" go run recv_mds.go
47 package main
49 import (
50 "log"
51 "net"
52 "os"
53 "runtime"
54 "time"
56 "github.com/gmallard/stompngo"
57 // senv methods could be used in general by stompngo clients.
59 // sngecomm methods are used specifically for these example clients.
60 "github.com/gmallard/stompngo_examples/sngecomm"
63 var (
64 exampid = "recv_mds: "
65 ns = 4 // Number of subscriptions
66 n net.Conn // Network Connection
67 conn *stompngo.Connection // Stomp Connection
68 ackMode string = "auto" // ackMode control
69 port string
70 ll = log.New(os.Stdout, "EMDS ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
72 tag = "recvmdsmain"
75 func recv(conn *stompngo.Connection, s int) {
76 ltag := tag + "-recv"
78 ll.Printf("%stag:%s connsess:%s receiver_starts s:%d\n",
79 exampid, ltag, conn.Session(),
82 // Setup Headers ...
83 id := stompngo.Uuid() // Use package convenience function for unique ID
84 d := sngecomm.Dest()
85 ackMode = sngecomm.AckMode() // get ack mode
87 pbc := sngecomm.Pbc() // Print byte count
89 sc := sngecomm.HandleSubscribe(conn, d, id, ackMode)
90 // Receive loop.
91 mc := 0
92 var md stompngo.MessageData
93 for {
94 select {
95 case md = <-sc: // Read a messagedata struct, with a MESSAGE frame
96 case md = <-conn.MessageData: // Read a messagedata struct, with a ERROR/RECEIPT frame
97 // Frames RECEIPT or ERROR not expected here
98 ll.Fatalf("%stag:%s connsess:%s bad_frame md:%v",
99 exampid, ltag, conn.Session(),
100 md) // Handle this ......
103 mc++
104 if md.Error != nil {
105 ll.Fatalf("%stag:%s connsess:%s error_read error:%v",
106 exampid, ltag, conn.Session(),
107 md.Error) // Handle this ......
109 ll.Printf("%stag:%s connsess:%s received_message s:%d id:%s mc:%d hdrs:%v\n",
110 exampid, ltag, conn.Session(),
111 s, id, mc, md.Message.Headers)
112 if pbc > 0 {
113 maxlen := pbc
114 if len(md.Message.Body) < maxlen {
115 maxlen = len(md.Message.Body)
117 ss := string(md.Message.Body[0:maxlen])
118 ll.Printf("%stag:%s connsess:%s payload body:%s\n",
119 exampid, tag, conn.Session(),
123 // time.Sleep(3 * time.Second) // A very arbitrary number
124 // time.Sleep(500 * time.Millisecond) // A very arbitrary number
125 runtime.Gosched()
126 time.Sleep(1500 * time.Millisecond) // A very arbitrary number
127 runtime.Gosched()
128 if ackMode != "auto" {
129 sngecomm.HandleAck(conn, md.Message.Headers, id)
130 ll.Printf("%stag:%s connsess:%s ack_complete s:%d id:%s mc:%d\n",
131 exampid, ltag, conn.Session(),
132 s, id, mc)
134 runtime.Gosched()
138 // Connect to a STOMP broker, receive and ackMode some messages.
139 // Disconnect never occurs, kill via ^C.
140 func main() {
142 // Standard example connect sequence
143 _, conn, e := sngecomm.CommonConnect(exampid, tag, ll)
144 if e != nil {
145 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
146 exampid, tag, sngecomm.Lcs,
147 e.Error()) // Handle this ......
150 for i := 1; i <= ns; i++ {
151 go recv(conn, i)
153 ll.Printf("%stag:%s connsess:%s receivers_started\n",
154 exampid, tag, conn.Session())
156 select {} // This will never complete, use ^C to cancel