From d7ede51f45573156da46e9e177df9571dacd8897 Mon Sep 17 00:00:00 2001 From: gmallard Date: Thu, 28 Jul 2016 07:21:28 -0400 Subject: [PATCH] srmgor_manyconn.go updated for common code / logging. --- srmgor_manyconn/srmgor_manyconn.go | 250 ++++++++++++++++++++----------------- version.go | 2 +- 2 files changed, 136 insertions(+), 116 deletions(-) diff --git a/srmgor_manyconn/srmgor_manyconn.go b/srmgor_manyconn/srmgor_manyconn.go index 87cd1f2..9c36f99 100644 --- a/srmgor_manyconn/srmgor_manyconn.go +++ b/srmgor_manyconn/srmgor_manyconn.go @@ -50,7 +50,7 @@ import ( ) var ( - exampid = "srmgor_manyconn:" + exampid = "srmgor_manyconn: " wgs sync.WaitGroup wgr sync.WaitGroup @@ -73,13 +73,18 @@ var ( nmsgs = senv.Nmsgs() ll = log.New(os.Stdout, "EMSMR ", log.Ldate|log.Lmicroseconds|log.Lshortfile) + + tag = "manyconn" ) func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { + ltag := tag + "-sendmessages" + qns := fmt.Sprintf("%d", qnum) // queue number d := senv.Dest() + "." + qns - ll.Printf("%s connsess:%s sendMessages_start d:%s qnum:%d\n", - exampid, conn.Session(), d, qnum) + ll.Printf("%stag:%s connsess:%s start d:%s qnum:%d\n", + exampid, ltag, conn.Session(), + d, qnum) wh := stompngo.Headers{"destination", d, "qnum", qns} // send Headers if senv.Persistent() { @@ -93,11 +98,14 @@ func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { sh := append(wh, "msgnum", mcs) // Generate a message to send ............... - ll.Printf("%s connsess:%s sendMessages_message mc:%d qnum:%d\n", - exampid, conn.Session(), mc, qnum) + ll.Printf("%stag:%s connsess:%s message mc:%d qnum:%d\n", + exampid, ltag, conn.Session(), + mc, qnum) e := conn.Send(sh, string(sngecomm.Partial())) if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v v4:%v\n", exampid, "send:", e, nc.LocalAddr().String(), qnum) + ll.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v", + exampid, ltag, conn.Session(), + qnum, e.Error()) // Handle this ...... } if mc == nmsgs { break @@ -105,8 +113,11 @@ func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { if sw { runtime.Gosched() // yield for this example dt := time.Duration(sngecomm.ValueBetween(min, max, sf)) - ll.Printf("%s connsess:%s sendMessages_stagger dt:%v qnum:%d\n", - exampid, conn.Session(), dt, qnum) + //ll.Printf("%s connsess:%s sendMessages_stagger dt:%v qnum:%d\n", + // exampid, conn.Session(), dt, qnum) + ll.Printf("%stag:%s connsess:%s send_stagger dt:%v qnum:%s mc:%d\n", + exampid, ltag, conn.Session(), + dt, qnum, mc) tmr.Reset(dt) _ = <-tmr.C } @@ -114,13 +125,16 @@ func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { } func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { + ltag := tag + "-receivemessages" + qns := fmt.Sprintf("%d", qnum) // queue number d := senv.Dest() + "." + qns + id := stompngo.Uuid() // A unique subscription ID - ll.Printf("%s connsess:%s receiveMessages_start d:%s qnum:%d nmsgs:%d\n", - exampid, conn.Session(), d, qnum, nmsgs) + ll.Printf("%stag:%s connsess:%s receiveMessages_start id:%s d:%s qnum:%d nmsgs:%d\n", + exampid, ltag, conn.Session(), + id, d, qnum, nmsgs) // Subscribe - id := stompngo.Uuid() // A unique subscription ID sc := sngecomm.HandleSubscribe(conn, d, id, sngecomm.AckMode()) pbc := sngecomm.Pbc() // Print byte count @@ -134,16 +148,27 @@ func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { case md = <-sc: case md = <-conn.MessageData: // Frames RECEIPT or ERROR not expected here - ll.Fatalf("%s v1:%v\n", exampid, md) // Handle this + ll.Fatalf("%stag:%s connsess:%s send_error qns:%v md:%v", + exampid, ltag, conn.Session(), + qns, md) // Handle this ...... } if md.Error != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v v4:%v\n", exampid, "recv read:", md.Error, nc.LocalAddr().String(), qnum) + ll.Fatalf("%stag:%s connsess:%s receive_error qns:%v error:%v\n", + exampid, ltag, conn.Session(), + qns, md.Error) + } + + if md.Message.Command != stompngo.MESSAGE { + ll.Fatalf("%stag:%s connsess:%s bad_frame qns:%s mc:%d md:%v\n", + exampid, ltag, conn.Session(), + qns, mc, md) } - // Sanity check the queue and message numbers mcs := fmt.Sprintf("%d", mc) // message number if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mcs) { - ll.Fatalf("%s v1:%v v2:%v v3:%v v4:%v\n", exampid, "Bad Headers", md.Message.Headers, qns, mcs) + ll.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v md:%v", + exampid, tag, conn.Session(), + qns, mcs, md) // Handle this ...... } // Process the inbound message ................. @@ -154,8 +179,9 @@ func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { sl = len(md.Message.Body) } } - ll.Printf("%s connsess:%s receiveMessages_msg d:%s body:%s qnum:%d msgnum:%s\n", - exampid, conn.Session(), d, string(md.Message.Body[0:sl]), qnum, + ll.Printf("%stag:%s connsess:%s receiveMessages_msg d:%s body:%s qnum:%d msgnum:%s\n", + exampid, ltag, conn.Session(), + d, string(md.Message.Body[0:sl]), qnum, md.Message.Headers.Value("msgnum")) if mc == nmsgs { break @@ -165,18 +191,25 @@ func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { ah := []string{} sngecomm.HandleAck(conn, ah, id) } + if mc == nmsgs { + break + } // if rw { runtime.Gosched() // yield for this example dt := time.Duration(sngecomm.ValueBetween(min, max, rf)) - ll.Printf("%s connsess:%s recvMessages_stagger dt:%v qnum:%d\n", - exampid, conn.Session(), dt, qnum) + //ll.Printf("%s connsess:%s recvMessages_stagger dt:%v qnum:%d\n", + // exampid, conn.Session(), dt, qnum) + ll.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s mc:%d\n", + exampid, ltag, conn.Session(), + dt, qns, mc) tmr.Reset(dt) _ = <-tmr.C } } - ll.Printf("%s connsess:%s receiveMessages_end d:%s qnum:%d nmsgs:%d\n", - exampid, conn.Session(), d, qnum, nmsgs) + ll.Printf("%stag:%s connsess:%s end d:%s qnum:%d nmsgs:%d\n", + exampid, ltag, conn.Session(), + d, qnum, nmsgs) // Unsubscribe sngecomm.HandleUnsubscribe(conn, d, id) @@ -184,119 +217,78 @@ func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) { } func runReceiver(qnum int) { - ll.Printf("%s runReceiver_start qnum:%d\n", exampid, qnum) - // Network Open - h, p := senv.HostAndPort() // host and port - hap := net.JoinHostPort(h, p) - n, e := net.Dial("tcp", hap) - if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "recv nectonnr:", qnum, e) // Handle this ...... - } + ltag := tag + "-runreceiver" - ll.Printf("%s runReceiver_start open_complete qnum:%d\n", exampid, qnum) - ll.Printf("%s runReceiver_start open_complete local:%s qnum:%d\n", - exampid, n.LocalAddr().String(), qnum) - ll.Printf("%s runReceiver_start open_complete remote:%s qnum:%d\n", - exampid, n.RemoteAddr().String(), qnum) + ll.Printf("%stag:%s connsess:%s start qnum:%d\n", + exampid, ltag, sngecomm.Lcs, + qnum) - // Stomp connect - ch := sngecomm.ConnectHeaders() - - ll.Printf("%s runReceiver_start ch:%v vhost:%s protocol:%s qnum:%d\n", - exampid, ch, senv.Vhost(), senv.Protocol(), qnum) - conn, e := stompngo.Connect(n, ch) + // Standard example connect sequence + n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll) if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "recv stompconnect:", qnum, e) // Handle this ...... + ll.Fatalf("%stag:%s connsess:%s error:%s\n", + exampid, ltag, sngecomm.Lcs, + e.Error()) // Handle this ...... } - ll.Printf("%s connsess:%s runReceiver_stompconnect qnum:%d\n", - exampid, conn.Session(), qnum) - // conn.SetSubChanCap(senv.SubChanCap()) // Experiment with this value, YMMV // Receives receiveMessages(conn, qnum, n) - ll.Printf("%s connsess:%s runReceiver_receives_complete qnum:%d\n", - exampid, conn.Session(), qnum) - - // Disconnect from Stomp server - eh := stompngo.Headers{"recv_discqueue", fmt.Sprintf("%d", qnum)} - e = conn.Disconnect(eh) - if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "recv disconnects:", qnum, e) // Handle this ...... - } - - ll.Printf("%s connsess:%s runReceiver_disconnect_complete qnum:%d\n", - exampid, conn.Session(), qnum) + ll.Printf("%stag:%s connsess:%s receives_complete qnum:%d\n", + exampid, ltag, conn.Session(), + qnum) - // Network close - e = n.Close() + // Standard example disconnect sequence + e = sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll) if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "recv netcloser", qnum, e) // Handle this ...... + ll.Fatalf("%stag:%s connsess:%s error:%s\n", + exampid, ltag, conn.Session(), + e.Error()) // Handle this ...... } - ll.Printf("%s connsess:%s runReceiver_net_close_complete qnum:%d\n", - exampid, conn.Session(), qnum) - sngecomm.ShowStats(exampid, "recv "+fmt.Sprintf("%d", qnum), conn) + sngecomm.ShowStats(exampid, "recv_"+fmt.Sprintf("%d", qnum), conn) wgr.Done() } func runSender(qnum int) { - ll.Printf("%s runSender_start qnum:%d\n", exampid, qnum) - // Network Open - h, p := senv.HostAndPort() // host and port - n, e := net.Dial("tcp", net.JoinHostPort(h, p)) - if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "send nectonnr:", qnum, e) // Handle this ...... - } - - ll.Printf("%s runSender_start open_complete qnum:%d\n", exampid, qnum) - ll.Printf("%s runSender_start open_complete local:%s qnum:%d\n", - exampid, n.LocalAddr().String(), qnum) - ll.Printf("%s runSender_start open_complete remote:%s qnum:%d\n", - exampid, n.RemoteAddr().String(), qnum) - - // Stomp connect - ch := sngecomm.ConnectHeaders() + ltag := tag + "-runsender" - ll.Printf("%s runSender_start ch:%v vhost:%s protocol:%s qnum:%d\n", - exampid, ch, senv.Vhost(), senv.Protocol(), qnum) - conn, e := stompngo.Connect(n, ch) + ll.Printf("%stag:%s connsess:%s start qnum:%d\n", + exampid, ltag, sngecomm.Lcs, + qnum) + // Standard example connect sequence + n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll) if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "send stompconnect:", qnum, e) // Handle this ...... + ll.Fatalf("%stag:%s connsess:%s error:%s\n", + exampid, ltag, sngecomm.Lcs, + e.Error()) // Handle this ...... } - ll.Printf("%s connsess:%s runSender_stompconnect qnum:%d\n", - exampid, conn.Session(), qnum) // sendMessages(conn, qnum, n) - ll.Printf("%s connsess:%s runSender_sends_complete qnum:%d\n", - exampid, conn.Session(), qnum) - // Disconnect from Stomp server - eh := stompngo.Headers{"send_discqueue", fmt.Sprintf("%d", qnum)} - e = conn.Disconnect(eh) - if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "send disconnects:", qnum, e) // Handle this ...... - } + ll.Printf("%stag:%s connsess:%s sends_complete qnum:%d\n", + exampid, ltag, conn.Session(), + qnum) - ll.Printf("%s connsess:%s runSender_disconnect_complete qnum:%d\n", - exampid, conn.Session(), qnum) - // Network close - e = n.Close() + // Standard example disconnect sequence + e = sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll) if e != nil { - ll.Fatalf("%s v1:%v v2:%v v3:%v\n", exampid, "send netcloser", qnum, e) // Handle this ...... + ll.Fatalf("%stag:%s connsess:%s error:%s\n", + exampid, ltag, conn.Session(), + e.Error()) // Handle this ...... } - - ll.Printf("%s connsess:%s runSender_net_close_complete qnum:%d\n", - exampid, conn.Session(), qnum) - sngecomm.ShowStats(exampid, "send "+fmt.Sprintf("%d", qnum), conn) + sngecomm.ShowStats(exampid, "send_"+fmt.Sprintf("%d", qnum), conn) wgs.Done() } func main() { + + st := time.Now() + sngecomm.ShowRunParms(exampid) if sngecomm.Pprof() { @@ -309,43 +301,71 @@ func main() { defer profile.Start(&cfg).Stop() } - tn := time.Now() - ll.Printf("%s v1:%v\n", exampid, "main starts") + ll.Printf("%stag:%s connsess:%s main_starts\n", + exampid, tag, sngecomm.Lcs) + + ll.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n", + exampid, tag, sngecomm.Lcs, + sngecomm.Pprof()) + + ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n", + exampid, tag, sngecomm.Lcs, + runtime.GOMAXPROCS(-1)) + if sngecomm.SetMAXPROCS() { nc := runtime.NumCPU() - ll.Printf("%s v1:%v v2:%v\n", exampid, "main number of CPUs is:", nc) - c := runtime.GOMAXPROCS(nc) - ll.Printf("%s v1:%v v2:%v\n", exampid, "main previous number of GOMAXPROCS is:", c) - ll.Printf("%s v1:%v v2:%v\n", exampid, "main current number of GOMAXPROCS is:", runtime.GOMAXPROCS(-1)) + ll.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n", + exampid, tag, sngecomm.Lcs, + nc) + gmp := runtime.GOMAXPROCS(nc) + ll.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n", + exampid, tag, sngecomm.Lcs, + gmp) + ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n", + exampid, tag, sngecomm.Lcs, + runtime.GOMAXPROCS(-1)) } // sw = sngecomm.SendWait() rw = sngecomm.RecvWait() sf = sngecomm.SendFactor() rf = sngecomm.RecvFactor() - ll.Printf("%s v1:%v v2:%v v3:%v v4:%v v5:%v\n", exampid, "main Sleep Factors", "send", sf, "recv", rf) + ll.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n", + exampid, tag, sngecomm.Lcs, + sw, rw, sf, rf) // numq := sngecomm.Nqs() nmsgs = senv.Nmsgs() // message count // - ll.Printf("%s v1:%v\n", exampid, "main starting receivers") + ll.Printf("%stag:%s connsess:%s main_starting_receivers\n", + exampid, tag, sngecomm.Lcs) for q := 1; q <= numq; q++ { wgr.Add(1) go runReceiver(q) } - ll.Printf("%s v1:%v\n", exampid, "main started receivers") + ll.Printf("%stag:%s connsess:%s main_started_receivers\n", + exampid, tag, sngecomm.Lcs) // - ll.Printf("%s v1:%v\n", exampid, "main starting senders") + ll.Printf("%stag:%s connsess:%s main_starting_senders\n", + exampid, tag, sngecomm.Lcs) for q := 1; q <= numq; q++ { wgs.Add(1) go runSender(q) } - ll.Printf("%s v1:%v\n", exampid, "main started senders") + ll.Printf("%stag:%s connsess:%s main_started_senders\n", + exampid, tag, sngecomm.Lcs) // wgs.Wait() - ll.Printf("%s v1:%v\n", exampid, "main senders complete") + ll.Printf("%stag:%s connsess:%s main_senders_complete\n", + exampid, tag, sngecomm.Lcs) wgr.Wait() - ll.Printf("%s v1:%v\n", exampid, "main receivers complete") + ll.Printf("%stag:%s connsess:%s main_receivers_complete\n", + exampid, tag, sngecomm.Lcs) // - ll.Printf("%s v1:%v v2:%v\n", exampid, "main ends", time.Since(tn)) + + // The end + ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n", + exampid, tag, sngecomm.Lcs, + time.Now().Sub(st)) + } diff --git a/version.go b/version.go index de91495..d7b728f 100644 --- a/version.go +++ b/version.go @@ -35,7 +35,7 @@ var ( //patch = "3" // Patch - patch = "3.plvl.015" // Patch + patch = "3.plvl.016" // Patch ) func Version() string { -- 2.11.4.GIT