From 7cc24b1e7743786641b2cb9dd15b9310d68b55c7 Mon Sep 17 00:00:00 2001 From: "Guy M. Allard" Date: Sun, 11 Sep 2016 01:36:55 -0400 Subject: [PATCH] Add support for Artemis style JMS queue names. --- conndisc/conndisc.go | 5 +++++ publish/publish.go | 2 +- receipts/onack/onack.go | 4 ++-- receipts/onsend/onsend.go | 2 +- recv_mds/recv_mds.go | 4 ++-- sngecomm/utilities.go | 15 ++++++++++++++- srmgor_1conn/srmgor_1conn.go | 13 +++++++++---- srmgor_1smrconn/srmgor_1smrconn.go | 4 ++-- srmgor_2conn/srmgor_2conn.go | 4 ++-- srmgor_manyconn/srmgor_manyconn.go | 4 ++-- version.go | 2 +- 11 files changed, 41 insertions(+), 18 deletions(-) diff --git a/conndisc/conndisc.go b/conndisc/conndisc.go index 86584fd..b7bd47c 100644 --- a/conndisc/conndisc.go +++ b/conndisc/conndisc.go @@ -65,6 +65,11 @@ func main() { // Standard example connect sequence n, conn, e := sngecomm.CommonConnect(exampid, tag, ll) if e != nil { + if conn != nil { + ll.Printf("%stag%s connsess:%s Connect Response headers:%v body%s\n", + exampid, tag, conn.Session(), conn.ConnectResponse.Headers, + string(conn.ConnectResponse.Body)) + } ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v", exampid, tag, sngecomm.Lcs, e.Error()) // Handle this ...... diff --git a/publish/publish.go b/publish/publish.go index 039b2ae..0a5dd69 100644 --- a/publish/publish.go +++ b/publish/publish.go @@ -76,7 +76,7 @@ func main() { } // *NOTE* application specific functionaltiy starts here! - sh := stompngo.Headers{"destination", senv.Dest()} + sh := stompngo.Headers{"destination", sngecomm.Dest()} if senv.Persistent() { sh = sh.Add("persistent", "true") } diff --git a/receipts/onack/onack.go b/receipts/onack/onack.go index 08a4545..a5ebeb0 100644 --- a/receipts/onack/onack.go +++ b/receipts/onack/onack.go @@ -101,14 +101,14 @@ func main() { // **************************************** // App logic here ..... - d := senv.Dest() + d := sngecomm.Dest() ll.Printf("%stag:%s connsess:%s destination:%v\n", exampid, tag, conn.Session(), d) // **************************************** // Send exactly one message. - sh := stompngo.Headers{"destination", senv.Dest()} + sh := stompngo.Headers{"destination", sngecomm.Dest()} if senv.Persistent() { sh = sh.Add("persistent", "true") } diff --git a/receipts/onsend/onsend.go b/receipts/onsend/onsend.go index 1b18694..68fea87 100644 --- a/receipts/onsend/onsend.go +++ b/receipts/onsend/onsend.go @@ -76,7 +76,7 @@ func main() { // App logic here ..... // Send exactly one message. Ask for a receipt. - d := senv.Dest() + d := sngecomm.Dest() ll.Printf("%stag:%s connsess:%s destination:%v\n", exampid, tag, conn.Session(), d) diff --git a/recv_mds/recv_mds.go b/recv_mds/recv_mds.go index 713fae8..0d9084a 100644 --- a/recv_mds/recv_mds.go +++ b/recv_mds/recv_mds.go @@ -55,7 +55,7 @@ import ( // "github.com/gmallard/stompngo" // senv methods could be used in general by stompngo clients. - "github.com/gmallard/stompngo/senv" + // sngecomm methods are used specifically for these example clients. "github.com/gmallard/stompngo_examples/sngecomm" ) @@ -81,7 +81,7 @@ func recv(conn *stompngo.Connection, s int) { // Setup Headers ... id := stompngo.Uuid() // Use package convenience function for unique ID - d := senv.Dest() + d := sngecomm.Dest() ackMode = sngecomm.AckMode() // get ack mode pbc := sngecomm.Pbc() // Print byte count diff --git a/sngecomm/utilities.go b/sngecomm/utilities.go index 63c9a6a..5a2e069 100644 --- a/sngecomm/utilities.go +++ b/sngecomm/utilities.go @@ -270,7 +270,7 @@ func CommonConnect(exampid, tag string, l *log.Logger) (net.Conn, ch) conn, e := stompngo.Connect(n, ch) if e != nil { - return nil, nil, e + return nil, conn, e } l.Printf("%stag:%s connsess:%s common_connect_complete host:%s port:%s vhost:%s protocol:%s server:%s\n", exampid, tag, conn.Session(), @@ -404,3 +404,16 @@ func CommonTLSConnect(exampid, tag string, l *log.Logger, // return nc, conn, nil } + +// Example destination +func Dest() string { + d := senv.Dest() + if os.Getenv("STOMP_ARTEMIS") == "" { + return d + } + pref := "jms.queue" + if strings.Index(d, "topic") >= 0 { + pref = "jms.topic" + } + return pref + strings.Replace(d, "/", ".", -1) +} diff --git a/srmgor_1conn/srmgor_1conn.go b/srmgor_1conn/srmgor_1conn.go index dae4a00..9a9b754 100644 --- a/srmgor_1conn/srmgor_1conn.go +++ b/srmgor_1conn/srmgor_1conn.go @@ -82,7 +82,7 @@ func sender(qn, mc int) { qns := fmt.Sprintf("%d", qn) // string queue number id := stompngo.Uuid() // A unique sender id - d := senv.Dest() + "." + qns + d := sngecomm.Dest() + "." + qns ll.Printf("%stag:%s connsess:%s queue_info id:%v d:%v qnum:%v mc:%v\n", exampid, ltag, conn.Session(), @@ -136,7 +136,7 @@ func receiver(qn, mc int) { qns := fmt.Sprintf("%d", qn) // string queue number pbc := sngecomm.Pbc() id := stompngo.Uuid() // A unique subscription ID - d := senv.Dest() + "." + qns + d := sngecomm.Dest() + "." + qns ll.Printf("%stag:%s connsess:%s queue_info id:%v d:%v qnum:%v mc:%v\n", exampid, ltag, conn.Session(), @@ -159,9 +159,9 @@ func receiver(qn, mc int) { case md = <-sc: case md = <-conn.MessageData: // A RECEIPT or ERROR frame is unexpected here - ll.Fatalf("%stag:%s connsess:%s bad_frame qnum:%v md:%v", + ll.Fatalf("%stag:%s connsess:%s bad_frame qnum:%v headers:%v body:%s", exampid, tag, conn.Session(), - qn, md) // Handle this ...... + qn, md.Message.Headers, md.Message.Body) // Handle this ...... } if md.Error != nil { ll.Fatalf("%stag:%s connsess:%s recv_error qnum:%v error:%v", @@ -342,6 +342,11 @@ func main() { var e error n, conn, e = sngecomm.CommonConnect(exampid, tag, ll) if e != nil { + if conn != nil { + ll.Printf("%stag:%s connsess:%s Connect Response headers:%v body%s\n", + exampid, tag, conn.Session(), conn.ConnectResponse.Headers, + string(conn.ConnectResponse.Body)) + } ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v", exampid, tag, sngecomm.Lcs, e.Error()) // Handle this ...... diff --git a/srmgor_1smrconn/srmgor_1smrconn.go b/srmgor_1smrconn/srmgor_1smrconn.go index ca671e5..514d6fa 100644 --- a/srmgor_1smrconn/srmgor_1smrconn.go +++ b/srmgor_1smrconn/srmgor_1smrconn.go @@ -113,7 +113,7 @@ func runReceive(conn *stompngo.Connection, q int, w *sync.WaitGroup) { qns := fmt.Sprintf("%d", q) // queue number id := stompngo.Uuid() // A unique subscription ID - d := senv.Dest() + "." + qns + d := sngecomm.Dest() + "." + qns ll.Printf("%stag:%s connsess:%s starts id:%s qns:%s d:%s\n", exampid, ltag, conn.Session(), @@ -347,7 +347,7 @@ runSender sends all messages to a specified queue. func runSender(conn *stompngo.Connection, qns string) { ltag := tag + "-runsender" - d := senv.Dest() + "." + qns + d := sngecomm.Dest() + "." + qns id := stompngo.Uuid() // A unique sender id ll.Printf("%stag:%s connsess:%s start id:%s dest:%s\n", exampid, ltag, conn.Session(), diff --git a/srmgor_2conn/srmgor_2conn.go b/srmgor_2conn/srmgor_2conn.go index 4ead978..6e36006 100644 --- a/srmgor_2conn/srmgor_2conn.go +++ b/srmgor_2conn/srmgor_2conn.go @@ -78,7 +78,7 @@ func sender(conn *stompngo.Connection, qn, nmsgs int) { ltag := tag + "-sender" qns := fmt.Sprintf("%d", qn) // queue number - d := senv.Dest() + "." + qns + d := sngecomm.Dest() + "." + qns ll.Printf("%stag:%s connsess:%s starts qn:%d nmsgs:%d d:%s\n", exampid, ltag, conn.Session(), qn, nmsgs, d) @@ -205,7 +205,7 @@ func receiver(conn *stompngo.Connection, qn, nmsgs int) { exampid, ltag, conn.Session(), qn, nmsgs) // - qp := senv.Dest() // queue name prefix + qp := sngecomm.Dest() // queue name prefix q := qp + "." + qns ll.Printf("%stag:%s connsess:%s queue_info q:%s qn:%d nmsgs:%d\n", exampid, ltag, conn.Session(), diff --git a/srmgor_manyconn/srmgor_manyconn.go b/srmgor_manyconn/srmgor_manyconn.go index f2d0e42..2316201 100644 --- a/srmgor_manyconn/srmgor_manyconn.go +++ b/srmgor_manyconn/srmgor_manyconn.go @@ -81,7 +81,7 @@ func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { ltag := tag + "-sendmessages" qns := fmt.Sprintf("%d", qnum) // queue number - d := senv.Dest() + "." + qns + d := sngecomm.Dest() + "." + qns ll.Printf("%stag:%s connsess:%s start d:%s qnum:%d\n", exampid, ltag, conn.Session(), d, qnum) @@ -126,7 +126,7 @@ func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { ltag := tag + "-receivemessages" qns := fmt.Sprintf("%d", qnum) // queue number - d := senv.Dest() + "." + qns + d := sngecomm.Dest() + "." + qns id := stompngo.Uuid() // A unique subscription ID ll.Printf("%stag:%s connsess:%s receiveMessages_start id:%s d:%s qnum:%d nmsgs:%d\n", diff --git a/version.go b/version.go index fa32f90..86d76d6 100644 --- a/version.go +++ b/version.go @@ -35,7 +35,7 @@ var ( //patch = "5" // Patch - patch = "5.plvl.008" // Patch + patch = "5.plvl.009" // Patch ) func Version() string { -- 2.11.4.GIT