From 9b46c3c4a5410397dacb33fcdf8f6108e3fb9ca6 Mon Sep 17 00:00:00 2001 From: "Guy M. Allard" Date: Mon, 4 Sep 2017 11:39:47 -0400 Subject: [PATCH] publish: multiple go routines. --- publish/publish.go | 55 ++++++++++++++++++++++++++++++++++--------------- sngecomm/environment.go | 27 ++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 19 deletions(-) diff --git a/publish/publish.go b/publish/publish.go index 1ead1b3..3730d90 100644 --- a/publish/publish.go +++ b/publish/publish.go @@ -46,7 +46,9 @@ package main import ( "fmt" "log" + "net" "os" + "sync" "time" // "github.com/gmallard/stompngo" @@ -60,22 +62,15 @@ var ( exampid = "publish: " ll = log.New(os.Stdout, "EPUB ", log.Ldate|log.Lmicroseconds|log.Lshortfile) tag = "pubmain" + wg sync.WaitGroup + conn *stompngo.Connection + n net.Conn + e error ) -// Connect to a STOMP broker, publish some messages and disconnect. -func main() { - - st := time.Now() - - // Standard example connect sequence - n, conn, e := sngecomm.CommonConnect(exampid, tag, ll) - if e != nil { - ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v", - exampid, tag, sngecomm.Lcs, - e.Error()) // Handle this ...... - } - +func runSends(gr int) { // *NOTE* application specific functionaltiy starts here! + grs := fmt.Sprintf("%d", gr) sh := stompngo.Headers{"destination", sngecomm.Dest()} ll.Printf("%stag:%s connsess:%s destination dest:%s\n", exampid, tag, conn.Session(), @@ -85,22 +80,48 @@ func main() { } ms := exampid + "message: " for i := 1; i <= senv.Nmsgs(); i++ { - mse := ms + fmt.Sprintf("%d", i) + mse := ms + " grs:" + grs + " msgnum:" + fmt.Sprintf("%d", i) ll.Printf("%stag:%s connsess:%s main_sending mse:~%s~\n", exampid, tag, conn.Session(), mse) - e := conn.Send(sh, mse) - if e != nil { + + if os.Getenv("STOMP_GORSLEEP") != "" { + time.Sleep(50 * time.Millisecond) + } + + err := conn.Send(sh, mse) + if err != nil { ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v", exampid, tag, conn.Session(), - e.Error()) // Handle this ...... + err.Error()) // Handle this ...... } ll.Printf("%stag:%s connsess:%s main_send_complete mse:~%s~\n", exampid, tag, conn.Session(), mse) time.Sleep(100 * time.Millisecond) } + wg.Done() // *NOTE* application specific functionaltiy ends here! +} + +// Connect to a STOMP broker, publish some messages and disconnect. +func main() { + + st := time.Now() + + // Standard example connect sequence + n, conn, e = sngecomm.CommonConnect(exampid, tag, ll) + if e != nil { + ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v", + exampid, tag, sngecomm.Lcs, + e.Error()) // Handle this ...... + } + + for i := 0; i < sngecomm.Ngors(); i++ { + wg.Add(1) + go runSends(i) + } + wg.Wait() // Standard example disconnect sequence e = sngecomm.CommonDisconnect(n, conn, exampid, tag, ll) diff --git a/sngecomm/environment.go b/sngecomm/environment.go index 4f7fdd5..cff8b4d 100644 --- a/sngecomm/environment.go +++ b/sngecomm/environment.go @@ -34,8 +34,11 @@ var ( nqs = 1 // Default number of queues for multi-queue demo(s) mdml = 1024 * 32 // Message data max length of variable message, 32K md = make([]byte, 1) // Additional message data, primed during init() - pbc = 64 // Number of bytes to print (used in some - // // examples that receive). + pbc = 64 // Number of bytes to print (used in some examples that receive). + + ngors = 1 // Number of go routines to use (publish) + gorsleep = "" // If non-empty, go routines will sleep (publish) + // sendFact float64 = 1.0 // Send sleep time factor recvFact float64 = 1.0 // Receive sleep time factor @@ -66,6 +69,20 @@ func init() { // } +// Number of go routines +func Ngors() int { + // + if s := os.Getenv("STOMP_NGORS"); s != "" { + i, e := strconv.ParseInt(s, 10, 32) + if nil != e { + log.Printf("v1:%v v2:%v\n", "NGORS conversion error", e) + } else { + ngors = int(i) + } + } + return ngors +} + // Number of queues func Nqs() int { // @@ -162,6 +179,12 @@ func Pbc() int { return pbc } +// Whether go routines will sleep or not +func Gorsleep() string { + gorsleep = os.Getenv("STOMP_GORSLEEP") + return gorsleep +} + // Does receive wait to simulate message processing func RecvWait() bool { f := os.Getenv("STOMP_RECVWAIT") -- 2.11.4.GIT