From dd22538848f743501e62520b8e3541a2337d881a Mon Sep 17 00:00:00 2001 From: "Guy M. Allard" Date: Wed, 10 Aug 2016 00:00:00 -0400 Subject: [PATCH] Same connection example and script, details: - Do not call conn.Connect() over and over - Sanity check message payload, error if not correct --- adhoc/varmGetter/vrmSameConn/vrmSameConn.go | 64 ++++++++++++++++++----------- adhoc/varmGetter/vrmSameConn/vrmasc.sh | 3 -- version.go | 2 +- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/adhoc/varmGetter/vrmSameConn/vrmSameConn.go b/adhoc/varmGetter/vrmSameConn/vrmSameConn.go index 3809d23..3141332 100644 --- a/adhoc/varmGetter/vrmSameConn/vrmSameConn.go +++ b/adhoc/varmGetter/vrmSameConn/vrmSameConn.go @@ -39,6 +39,8 @@ var ( unsub = true dodisc = true ar = false // Want ACK RECEIPT + session = "" + wlp = "publish: message: " // Left part wanted ) func init() { @@ -65,7 +67,7 @@ func main() { exampid, tag, sngecomm.Lcs, e.Error()) // Handle this ...... } - + session = conn.Session() //****************** nqs := sngecomm.Nqs() for qn := 1; qn <= nqs; qn++ { @@ -78,19 +80,19 @@ func main() { e = sngecomm.CommonDisconnect(n, conn, exampid, tag, ll) if e != nil { ll.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v", - exampid, tag, conn.Session(), + exampid, tag, session, e.Error()) // Handle this ...... } ll.Printf("%stag:%s connsess:%s disconnect_receipt:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, conn.DisconnectReceipt) } else { ll.Printf("%stag:%s connsess:%s skipping_disconnect\n", - exampid, tag, conn.Session()) + exampid, tag, session) } ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, time.Now().Sub(st)) } @@ -113,7 +115,7 @@ func runNextQueue(qn int, conn *stompngo.Connection) { if conn.Protocol() == stompngo.SPL_10 && am == stompngo.AckModeClientIndividual { ll.Fatalf("%stag:%s connsess:%s invalid_ack_mode am:%v proto:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, am, conn.Protocol()) // } // Do not do final ACK if running ACKs are issued @@ -124,13 +126,13 @@ func runNextQueue(qn int, conn *stompngo.Connection) { // Show run parameters ll.Printf("%stag:%s connsess:%s run_parms\n\tqns:%v\n\tpbc:%v\n\td:%v\n\tid:%v\n\tnmsgs:%v\n\tam:%v\n\tnfa:%v\n\twh:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, qns, pbc, d, id, nmsgs, am, nfa, wh) // Run SUBSCRIBE sc := doSubscribe(conn, d, id, am, wh) ll.Printf("%stag:%s connsess:%s stomp_subscribe_complete\n", - exampid, tag, conn.Session()) + exampid, tag, session) var md stompngo.MessageData // Message data from basic read var lmd stompngo.MessageData // Possible save (copy) of received data @@ -140,7 +142,7 @@ func runNextQueue(qn int, conn *stompngo.Connection) { GetLoop: for { ll.Printf("%stag:%s connsess:%s start_of_read_loop mc:%v nmsgs:%v\n", - exampid, tag, conn.Session(), mc, nmsgs) + exampid, tag, session, mc, nmsgs) mcs := fmt.Sprintf("%d", mc) // string number message count @@ -151,12 +153,12 @@ GetLoop: // if md.Message.Command == stompngo.RECEIPT { ll.Printf("%stag:%s connsess:%s have_receipt md:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, md) continue GetLoop } ll.Fatalf("%stag:%s connsess:%s ERROR_frame hdrs:%v body:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, md.Message.Headers, string(md.Message.Body)) // Handle this ...... } @@ -166,29 +168,30 @@ GetLoop: } // Basic loop logging - ll.Printf("%stag:%s connsess:%s channel_read_complete\n", - exampid, tag, conn.Session()) + ll.Printf("%stag:%s connsess:%s channel_read_complete qn:%d mc:%d\n", + exampid, tag, session, + qn, mc) ll.Printf("%stag:%s connsess:%s message_number:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, mc) // Check if reader returned any error if md.Error != nil { ll.Fatalf("%stag:%s connsess:%s error_read error:%v", - exampid, tag, conn.Session(), + exampid, tag, session, md.Error) // Handle this ...... } // Show frame type ll.Printf("%stag:%s connsess:%s frame_type cmd:%s\n", - exampid, tag, conn.Session(), + exampid, tag, session, md.Message.Command) // Pure sanity check: this should *never* happen based on logic // above. if md.Message.Command != stompngo.MESSAGE { ll.Fatalf("%stag:%s connsess:%s error_frame_type md:%v", - exampid, tag, conn.Session(), + exampid, tag, session, md) // Handle this ...... } @@ -196,7 +199,7 @@ GetLoop: wh := md.Message.Headers for j := 0; j < len(wh)-1; j += 2 { ll.Printf("%stag:%s connsess:%s Header:%s:%s\n", - exampid, tag, conn.Session(), + exampid, tag, session, wh[j], wh[j+1]) } // Show (part of) Message Body @@ -207,10 +210,23 @@ GetLoop: } ss := string(md.Message.Body[0:maxlen]) ll.Printf("%stag:%s connsess:%s payload body:%s\n", - exampid, tag, conn.Session(), + exampid, tag, session, ss) } + // Sanity check this message payload + wm := wlp + mcs // The left part plus the (string) meassage number] + bm := string(md.Message.Body) + if bm != wm { + ll.Fatalf("%stag:%s connsess:%s error_message_payload\n\tGot %s\n\tWant%s\n", + exampid, tag, session, + bm, wm) // Handle this ...... + } else { + ll.Printf("%stag:%s connsess:%s matched_body_string\n%s\n%s\n", + exampid, tag, session, + bm, wm) // Handle this ......) + } + // Run individual ACK if required if am == stompngo.AckModeClientIndividual { wh := md.Message.Headers // Copy Headers @@ -219,7 +235,7 @@ GetLoop: } sngecomm.HandleAck(conn, wh, id) ll.Printf("%stag:%s connsess:%s individual_ack_complete mc:%v headers:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, mc, md.Message.Headers) } @@ -241,7 +257,7 @@ GetLoop: } sngecomm.HandleAck(conn, wh, id) ll.Printf("%stag:%s connsess:%s final_ack_complete\n", - exampid, tag, conn.Session()) + exampid, tag, session) if ar { getReceipt(conn) } @@ -251,10 +267,10 @@ GetLoop: if unsub { sngecomm.HandleUnsubscribe(conn, d, id) ll.Printf("%stag:%s connsess:%s stomp_unsubscribe_complete\n", - exampid, tag, conn.Session()) + exampid, tag, session) } else { ll.Printf("%stag:%s connsess:%s skipping_unsubscribe\n", - exampid, tag, conn.Session()) + exampid, tag, session) } } @@ -286,6 +302,6 @@ func doSubscribe(c *stompngo.Connection, d, id, a string, h stompngo.Headers) <- func getReceipt(conn *stompngo.Connection) { rd := <-conn.MessageData ll.Printf("%stag:%s connsess:%s have_receipt_sub md:%v\n", - exampid, tag, conn.Session(), + exampid, tag, session, rd) } diff --git a/adhoc/varmGetter/vrmSameConn/vrmasc.sh b/adhoc/varmGetter/vrmSameConn/vrmasc.sh index fa3b53d..9010831 100755 --- a/adhoc/varmGetter/vrmSameConn/vrmasc.sh +++ b/adhoc/varmGetter/vrmSameConn/vrmasc.sh @@ -7,8 +7,6 @@ cmd_base=$(dirname $0) pushd $cmd_base # ------------------------------------------------------------------------------ go build vrmSameConn.go -# ------------------------------------------------------------------------------ -$cmd_base/../puts.sh # .............................................................................. # Experiment with these: export VMG_NOUNSUB=y @@ -22,4 +20,3 @@ export STOMP_NQS=9 # ------------------------------------------------------------------------------ popd set +x - diff --git a/version.go b/version.go index 7155cf1..dda9f84 100644 --- a/version.go +++ b/version.go @@ -35,7 +35,7 @@ var ( //patch = "5" // Patch - patch = "5.plvl.003" // Patch + patch = "5.plvl.004" // Patch ) func Version() string { -- 2.11.4.GIT