JMS interop examples updated for common code / logging.
[stompngo_examples.git] / recv_mds / recv_mds.go
blob352ea2aee3f49cf6d3555d67432ac04fb22a6d26
1 //
2 // Copyright © 2011-2016 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Outputs to demonstrate different broker's algorithms for balancing messages across
24 multiple subscriptions to the same queue. (One go routine per subscription). In
25 this example all subscriptions share the same connection/session. The actual
26 results can / will likely be slightly surprising. YMMV.
28 Prime the queue for this demonstration using publish.go.
30 Examples:
32 # Prime a queue with messages:
33 STOMP_PORT=61613 STOMP_NMSGS=10 go run publish.go
34 # Review ActiveMQ balancing characteristics. Note:
35 # this will eventually block, and the program will have to be
36 # forcibly stopped.
37 STOMP_PORT=61613 STOMP_ACKMODE="client-individual" go run recv_mds.go
39 # Prime a queue with messages again:
40 STOMP_PORT=62613 STOMP_NMSGS=10 go run publish.go
41 # Review Apollo balancing characteristics. Note:
42 # this will eventually block, and the program will have to be
43 # forcibly stopped.
44 STOMP_PORT=62613 STOMP_ACKMODE="client-individual" go run recv_mds.go
47 package main
49 import (
50 "log"
51 "net"
52 "os"
53 "runtime"
54 "time"
56 "github.com/gmallard/stompngo"
57 // senv methods could be used in general by stompngo clients.
58 "github.com/gmallard/stompngo/senv"
59 // sngecomm methods are used specifically for these example clients.
60 "github.com/gmallard/stompngo_examples/sngecomm"
63 var (
64 exampid = "recv_mds: "
65 ns = 4 // Number of subscriptions
66 n net.Conn // Network Connection
67 conn *stompngo.Connection // Stomp Connection
68 ackMode string = "auto" // ackMode control
69 port string
70 ll = log.New(os.Stdout, "EMDS ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
73 func recv(conn *stompngo.Connection, s int) {
74 ll.Printf("%s v1:%v v2:%v v3:%v\n", exampid, "receiver", s, "starts")
75 // Setup Headers ...
76 id := stompngo.Uuid() // Use package convenience function for unique ID
77 d := senv.Dest()
78 ackMode = sngecomm.AckMode() // get ack mode
80 pbc := sngecomm.Pbc() // Print byte count
82 sc := sngecomm.HandleSubscribe(conn, d, id, ackMode)
83 // Receive loop.
84 mc := 0
85 var md stompngo.MessageData
86 for {
87 select {
88 case md = <-sc: // Read a messagedata struct, with a MESSAGE frame
89 case md = <-conn.MessageData: // Read a messagedata struct, with a ERROR/RECEIPT frame
90 // Unexpected here in this example.
91 ll.Fatalf("%s v1:%v\n", exampid, md) // Handle this
94 mc++
95 if md.Error != nil {
96 panic(md.Error)
98 ll.Printf("%s v1:%v v2:%v v3:%v v4:%v\n", exampid, "subnumber", s, id, mc)
99 if pbc > 0 {
100 maxlen := pbc
101 if len(md.Message.Body) < maxlen {
102 maxlen = len(md.Message.Body)
104 ss := string(md.Message.Body[0:maxlen])
105 ll.Printf("Payload: %s\n", ss) // Data payload
108 // time.Sleep(3 * time.Second) // A very arbitrary number
109 // time.Sleep(500 * time.Millisecond) // A very arbitrary number
110 runtime.Gosched()
111 time.Sleep(1500 * time.Millisecond) // A very arbitrary number
112 runtime.Gosched()
113 if ackMode != "auto" {
114 sngecomm.HandleAck(conn, md.Message.Headers, id)
115 ll.Printf("%s v1:%v\n", exampid, "ACK_complete_...")
117 runtime.Gosched()
121 // Connect to a STOMP broker, receive and ackMode some messages.
122 // Disconnect never occurs, kill via ^C.
123 func main() {
124 ll.Printf("%s v1:%v\n", exampid, "starts ...")
126 // Set up the connection.
127 h, p := senv.HostAndPort() //
128 hap := net.JoinHostPort(h, p)
129 n, e := net.Dial("tcp", hap)
130 if e != nil {
131 ll.Fatalf("%s %s\n", exampid, e.Error()) // Handle this ......
133 ll.Printf("%s v1:%v v2:%v\n", exampid, "dial complete ...", hap)
134 ch := sngecomm.ConnectHeaders()
135 conn, e = stompngo.Connect(n, ch)
136 if e != nil {
137 ll.Fatalf("%s %s\n", exampid, e.Error()) // Handle this ......
139 ll.Printf("%s v1:%v v2:%v\n", exampid, "stomp connect complete ...", conn.Protocol())
141 for i := 1; i <= ns; i++ {
142 go recv(conn, i)
144 ll.Printf("%s v1:%v v2:%v\n", exampid, ns, "receivers started ...")
146 select {} // This will never complete, use ^C to cancel