TLS examples use unified common functionality.
[stompngo_examples.git] / recv_mds / recv_mds.go
blobdb155e1db596f571b3606a30ad337b85e610b97c
1 //
2 // Copyright © 2011-2013 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
20 // All senders and receivers use the same Stomp connection.
23 Output can demonstrate different broker's algorithms for balancing messages across
24 multiple subscriptions to the same queue. In this example all subscriptions
25 share the same connection/session.
27 Prime the queue for this demonstration using publish.go.
29 Examples:
31 # Prime a queue with messages:
32 STOMP_PORT=61613 STOMP_NMSGS=10 go run publish.go
34 # Review ActiveMQ balancing characteristics:
35 STOMP_PORT=61613 go run recv_mds.go
37 # Prime a queue with messages again:
38 STOMP_PORT=62613 STOMP_NMSGS=10 go run publish.go
40 # Review Apollo balancing characteristics:
41 STOMP_PORT=62613 go run recv_mds.go
44 package main
46 import (
47 "fmt"
48 "github.com/gmallard/stompngo"
49 "github.com/gmallard/stompngo_examples/sngecomm"
50 "log"
51 "net"
52 "runtime"
53 "sync"
54 "time"
57 var (
58 exampid = "recv_mds: "
59 ns = 4 // Number of subscriptions
60 wgrecv sync.WaitGroup
61 n net.Conn // Network Connection
62 conn *stompngo.Connection // Stomp Connection
63 ack bool = false // ack mode control
64 port string
67 func recv(s int) {
68 // Setup Headers ...
69 id := stompngo.Uuid() // Use package convenience function for unique ID
70 d := sngecomm.Dest()
71 var r <-chan stompngo.MessageData
72 if ack {
73 r = sngecomm.Subscribe(conn, d, id, "client-individual")
74 } else {
75 r = sngecomm.Subscribe(conn, d, id, "auto")
77 // Receive loop
78 for {
79 d := <-r // Read a messagedata struct
80 if d.Error != nil {
81 panic(d.Error)
83 m := d.Message.BodyString()
84 fmt.Println(exampid, "subnumber", s, m, id)
85 runtime.Gosched()
86 time.Sleep(1 * time.Second)
87 if ack {
88 sngecomm.Ack(conn, d.Message.Headers, id)
89 fmt.Println(exampid + "ACK complete ...")
91 runtime.Gosched()
93 wgrecv.Done() // Never get here, cancel via ^C
96 // Connect to a STOMP broker, receive and ack some messages.
97 // Disconnect never occurs, kill via ^C.
98 func main() {
99 fmt.Println(exampid, "starts ...")
101 // Set up the connection.
102 h, port := sngecomm.HostAndPort() //
103 n, e := net.Dial("tcp", net.JoinHostPort(h, port))
104 if e != nil {
105 log.Fatalln(e) // Handle this ......
107 fmt.Println(exampid, "dial complete ...")
108 ch := sngecomm.ConnectHeaders()
109 conn, e = stompngo.Connect(n, ch)
110 if e != nil {
111 log.Fatalln(e) // Handle this ......
113 fmt.Println(exampid, "stomp connect complete ...", conn.Protocol())
115 wgrecv.Add(ns) // Number of subscriptions, hard coded in this demonstartion
116 for i := 1; i <= ns; i++ {
117 go recv(i)
119 fmt.Println(exampid, "receivers started ...")
121 wgrecv.Wait() // This will never complete, use ^C to cancel