Address issue #50.
[stompngo.git] / writer.go
blob250d3bb6280c20e4a21c5a819fda4ed654ae1aa4
1 //
2 // Copyright © 2011-2019 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "bufio"
21 "bytes"
22 "net"
24 // "bytes"
25 "strconv"
26 "time"
30 Write data to logical network writer. Writer will take care of the output wire data.
31 If the underlying connection goes bad and writer give up working, the closed ssdc chan
32 will make sure write action aware that happens.
34 func (c *Connection) writeWireData(wd wiredata) error {
35 select {
36 case c.output <- wd:
37 case <-c.ssdc:
38 return ECONBAD
40 return nil
44 Logical network writer. Read wiredata structures from the communication
45 channel, and put the frame on the wire.
47 func (c *Connection) writer() {
48 writerLoop:
49 for {
50 select {
51 case d := <-c.output:
52 c.log("WTR_WIREWRITE start")
53 c.wireWrite(d)
54 logLock.Lock()
55 if c.logger != nil {
56 c.logx("WTR_WIREWRITE COMPLETE", d.frame.Command, d.frame.Headers,
57 HexData(d.frame.Body))
59 logLock.Unlock()
60 if d.frame.Command == DISCONNECT {
61 break writerLoop // we are done with this connection
63 case _ = <-c.ssdc:
64 c.log("WTR_WIREWRITE shutdown S received")
65 break writerLoop
66 case _ = <-c.wtrsdc:
67 c.log("WTR_WIREWRITE shutdown W received")
68 break writerLoop
70 } // of for
72 c.connLock.Lock()
73 c.connected = false
74 c.connLock.Unlock()
75 c.sysAbort()
76 c.log("WTR_SHUTDOWN", time.Now())
80 Connection logical write.
82 func (c *Connection) wireWrite(d wiredata) {
83 f := &d.frame
84 // fmt.Printf("WWD01 f:[%v]\n", f)
85 switch f.Command {
86 case "\n": // HeartBeat frame
87 if c.dld.wde && c.dld.wds {
88 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
90 _, e := c.wtr.WriteString(f.Command)
91 if e != nil {
92 if e.(net.Error).Timeout() {
93 if c.dld.dns {
94 c.dld.dlnotify(e, true)
97 d.errchan <- e
98 return
100 default: // Other frames
101 if e := f.writeFrame(c.wtr, c); e != nil {
102 d.errchan <- e
103 return
105 if e := c.wtr.Flush(); e != nil {
106 d.errchan <- e
107 return
110 if e := c.wtr.Flush(); e != nil {
111 d.errchan <- e
112 return
115 if c.hbd != nil {
116 c.hbd.sdl.Lock()
117 c.hbd.ls = time.Now().UnixNano() // Latest good send
118 c.hbd.sdl.Unlock()
120 c.mets.tfw++ // Frame written count
121 c.mets.tbw += f.Size(false) // Bytes written count
123 d.errchan <- nil
124 return
128 Physical frame write to the wire.
130 func (f *Frame) writeFrame(w *bufio.Writer, c *Connection) error {
132 var sctok bool
133 // Content type. Always add it if the client does not suppress and does not
134 // supply it.
135 _, sctok = f.Headers.Contains(HK_SUPPRESS_CT)
136 if !sctok {
137 if _, ctok := f.Headers.Contains(HK_CONTENT_TYPE); !ctok {
138 f.Headers = append(f.Headers, HK_CONTENT_TYPE,
139 DFLT_CONTENT_TYPE)
143 var sclok bool
144 // Content length - Always add it if client does not suppress it and
145 // does not supply it.
146 _, sclok = f.Headers.Contains(HK_SUPPRESS_CL)
147 if !sclok {
148 if _, clok := f.Headers.Contains(HK_CONTENT_LENGTH); !clok {
149 f.Headers = append(f.Headers, HK_CONTENT_LENGTH, strconv.Itoa(len(f.Body)))
152 // Encode the headers if needed
153 if c.Protocol() > SPL_10 && f.Command != CONNECT {
154 for i := 0; i < len(f.Headers); i += 2 {
155 f.Headers[i] = encode(f.Headers[i])
156 f.Headers[i+1] = encode(f.Headers[i+1])
160 if sclok {
161 nz := bytes.IndexByte(f.Body, 0)
162 // fmt.Printf("WDBG41 ok:%v\n", nz)
163 if nz == 0 {
164 f.Body = []byte{}
165 // fmt.Printf("WDBG42 body:%v bodystring: %v\n", f.Body, string(f.Body))
166 } else if nz > 0 {
167 f.Body = f.Body[0:nz]
168 // fmt.Printf("WDBG43 body:%v bodystring: %v\n", f.Body, string(f.Body))
172 if c.dld.wde && c.dld.wds {
173 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
176 // Writes start
178 // Write the frame Command
179 _, e := w.WriteString(f.Command + "\n")
180 if c.checkWriteError(e) != nil {
181 return e
183 // fmt.Println("WRCMD", f.Command)
184 // Write the frame Headers
185 for i := 0; i < len(f.Headers); i += 2 {
186 if c.dld.wde && c.dld.wds {
187 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
189 _, e := w.WriteString(f.Headers[i] + ":" + f.Headers[i+1] + "\n")
190 if c.checkWriteError(e) != nil {
191 return e
193 // fmt.Println("WRHDR", f.Headers[i]+":"+f.Headers[i+1]+"\n")
196 // Write the last Header LF
197 if c.dld.wde && c.dld.wds {
198 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
200 e = w.WriteByte('\n')
201 if c.checkWriteError(e) != nil {
202 return e
204 // fmt.Printf("WDBG40 ok:%v\n", sclok)
206 // Write the body
207 if len(f.Body) != 0 { // Foolish to write 0 length data
208 // fmt.Println("WRBDY", f.Body)
209 e := c.writeBody(f)
210 if c.checkWriteError(e) != nil {
211 return e
214 if c.dld.wde && c.dld.wds {
215 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
217 e = w.WriteByte(0)
218 if c.checkWriteError(e) != nil {
219 return e
221 // End of write loop - set no deadline
222 if c.dld.wde {
223 _ = c.netconn.SetWriteDeadline(c.dld.t0)
225 return nil
228 func (c *Connection) checkWriteError(e error) error {
229 if e == nil {
230 return e
232 ne, ok := e.(net.Error)
233 if !ok {
234 return e
236 if ne.Timeout() {
237 if c.dld.dns {
238 c.log("invoking write deadline callback 1")
239 c.dld.dlnotify(e, true)
242 return e
245 func (c *Connection) writeBody(f *Frame) error {
246 // fmt.Printf("WDBG99 body:%v bodystring: %v\n", f.Body, string(f.Body))
247 var n = 0
248 var e error
249 for {
250 if c.dld.wde && c.dld.wds {
251 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
253 n, e = c.wtr.Write(f.Body)
254 if n == len(f.Body) {
255 return e
257 c.log("SHORT WRITE", n, len(f.Body))
258 if n == 0 { // Zero bytes would mean something is seriously wrong.
259 return e
261 if !c.dld.rfsw {
262 return e
264 if c.dld.wde && c.dld.wds && c.dld.dns && isErrorTimeout(e) {
265 c.log("invoking write deadline callback 2")
266 c.dld.dlnotify(e, true)
268 // *Any* error from a bufio.Writer is *not* recoverable. See code in
269 // bufio.go to understand this. We get a new writer here, to clear any
270 // error condition.
271 c.wtr = bufio.NewWriter(c.netconn) // Create new writer
272 f.Body = f.Body[n:]
276 func isErrorTimeout(e error) bool {
277 if e == nil {
278 return false
280 _, ok := e.(net.Error)
281 if !ok {
282 return false
284 return true