Address issue #50.
[stompngo.git] / data.go
blob64d2b43a604e064581aafdb71e440f670cdcfce3
1 //
2 // Copyright © 2011-2019 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed, an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "bufio"
21 "log"
22 "net"
23 "sync"
24 "time"
27 const (
29 // Client generated commands.
30 CONNECT = "CONNECT"
31 STOMP = "STOMP"
32 DISCONNECT = "DISCONNECT"
33 SEND = "SEND"
34 SUBSCRIBE = "SUBSCRIBE"
35 UNSUBSCRIBE = "UNSUBSCRIBE"
36 ACK = "ACK"
37 NACK = "NACK"
38 BEGIN = "BEGIN"
39 COMMIT = "COMMIT"
40 ABORT = "ABORT"
42 // Server generated commands.
43 CONNECTED = "CONNECTED"
44 MESSAGE = "MESSAGE"
45 RECEIPT = "RECEIPT"
46 ERROR = "ERROR"
48 // Supported STOMP protocol definitions.
49 SPL_10 = "1.0"
50 SPL_11 = "1.1"
51 SPL_12 = "1.2"
55 What this package currently supports.
57 var supported = []string{SPL_10, SPL_11, SPL_12}
60 Headers definition, a slice of string.
62 STOMP headers are key and value pairs. See the specification for more
63 information about STOMP frame headers.
65 Key values are found at even numbered indices. Values
66 are found at odd numbered indices. Headers are validated for an even
67 number of slice elements.
69 type Headers []string
72 Message is a STOMP Message, consisting of: a STOMP command; a set of STOMP
73 Headers; and a message body(payload), which is possibly empty.
75 type Message struct {
76 Command string
77 Headers Headers
78 Body []uint8
82 Frame is an alternate name for a Message.
84 type Frame Message
87 MessageData passed to the client, containing: the Message; and an Error
88 value which is possibly nil.
90 Note that this has no relevance on whether a MessageData.Message.Command
91 value contains an "ERROR" generated by the broker.
93 type MessageData struct {
94 Message Message
95 Error error
99 This is outbound on the wire.
101 type wiredata struct {
102 frame Frame
103 errchan chan error
107 Stomper is an interface that models STOMP specification commands.
109 type Stomper interface {
110 Abort(h Headers) error
111 Ack(headers Headers) error
112 Begin(h Headers) error
113 Commit(h Headers) error
114 Disconnect(headers Headers) error
115 Nack(headers Headers) error
116 Send(Headers, string) error
117 Subscribe(headers Headers) (<-chan MessageData, error)
118 Unsubscribe(headers Headers) error
120 SendBytes(h Headers, b []byte) error
124 StatsReader is an interface that modela a reader for the statistics
125 maintained by the stompngo package.
127 type StatsReader interface {
128 FramesRead() int64
129 BytesRead() int64
130 FramesWritten() int64
131 BytesWritten() int64
135 HBDataReader is an interface that modela a reader for the heart beat
136 data maintained by the stompngo package.
138 type HBDataReader interface {
139 SendTickerInterval() int64
140 ReceiveTickerInterval() int64
141 SendTickerCount() int64
142 ReceiveTickerCount() int64
146 Deadliner is an interface that models the optional network deadline
147 functionality implemented by the stompngo package.
149 type Deadliner interface {
150 WriteDeadline(d time.Duration)
151 EnableWriteDeadline(e bool)
152 ExpiredNotification(enf ExpiredNotification)
153 IsWriteDeadlineEnabled() bool
154 ReadDeadline(d time.Duration)
155 EnableReadDeadline(e bool)
156 IsReadDeadlineEnabled() bool
157 ShortWriteRecovery(ro bool)
161 Monitor is an interface that models monitoring a stompngo connection.
163 type Monitor interface {
164 Connected() bool
165 Session() string
166 Protocol() string
167 Running() time.Duration
168 SubChanCap() int
172 ParmHandler is an interface that models stompngo client parameter
173 specification.
175 type ParmHandler interface {
176 SetLogger(l *log.Logger)
177 GetLogger() *log.Logger
178 SetSubChanCap(nc int)
182 STOMPConnector is an interface that encapsulates the Connection struct.
184 type STOMPConnector interface {
185 Stomper
186 StatsReader
187 HBDataReader
188 Deadliner
189 Monitor
190 ParmHandler
195 Connection is a representation of a STOMP connection.
197 type Connection struct {
198 ConnectResponse *Message // Broker response (CONNECTED/ERROR) if physical connection successful.
199 DisconnectReceipt MessageData // If receipt requested on DISCONNECT.
200 MessageData <-chan MessageData // Inbound data for the client.
201 connected bool
202 connLock sync.Mutex // connected variable lock
203 session string
204 sessLock sync.Mutex // session variable lock
205 protocol string
206 protoLock sync.Mutex // protocol variable lock
207 input chan MessageData
208 output chan wiredata
209 netconn net.Conn
210 subs map[string]*subscription
211 subsLock sync.RWMutex
212 ssdc chan struct{} // System shutdown channel
213 abortOnce sync.Once // Ensure close ssdc once
214 wtrsdc chan struct{} // Special writer shutdown channel
215 hbd *heartBeatData
216 wtr *bufio.Writer
217 rdr *bufio.Reader
218 Hbrf bool // Indicates a heart beat read/receive failure, which is possibly transient. Valid for 1.1+ only.
219 Hbsf bool // Indicates a heart beat send failure, which is possibly transient. Valid for 1.1+ only.
220 logger *log.Logger
221 mets *metrics // Client metrics
222 scc int // Subscribe channel capacity
223 discLock sync.Mutex // DISCONNECT lock
224 dld *deadlineData // Deadline data
227 type subscription struct {
228 md chan MessageData // Subscription specific MessageData channel
229 id string // Subscription id (unique, self reference)
230 am string // ACK mode for this subscription
231 cs bool // Closed during shutdown
232 drav bool // Drain After value validity
233 dra uint // Start draining after # messages (MESSAGE frames)
234 drmc uint // Current drain count if draining
238 Error definition.
240 type Error string
243 Error constants.
245 const (
246 // ERROR Frame returned by broker on connect.
247 ECONERR = Error("broker returned ERROR frame, CONNECT")
249 // ERRORs for Headers.
250 EHDRLEN = Error("unmatched headers, bad length")
251 EHDRUTF8 = Error("header string not UTF8")
252 EHDRNIL = Error("headers can not be nil")
253 EUNKHDR = Error("corrupt frame headers")
254 EHDRMTK = Error("header key can not be empty")
255 EHDRMTV = Error("header value can not be empty")
257 // ERRORs for response to CONNECT.
258 EUNKFRM = Error("unrecognized frame returned, CONNECT")
259 EBADFRM = Error("Malformed frame")
260 EBADSSLP = Error("Got HandShake data, wrong SSL port?")
262 // No body allowed error
263 EBDYDATA = Error("body data not allowed")
265 // Not connected.
266 ECONBAD = Error("no current connection or DISCONNECT previously completed")
268 // Destination required
269 EREQDSTSND = Error("destination required, SEND")
270 EREQDSTSUB = Error("destination required, SUBSCRIBE")
271 EREQDIUNS = Error("destination required, UNSUBSCRIBE")
272 EREQDSTUNS = Error("destination required, UNSUBSCRIBE") // Alternate name
274 // id required
275 EREQIDUNS = Error("id required, UNSUBSCRIBE")
277 // Message ID required.
278 EREQMIDACK = Error("message-id required, ACK") // 1.0, 1.1
279 EREQIDACK = Error("id required, ACK") // 1.2
281 // Subscription required.
282 EREQSUBACK = Error("subscription required, ACK") // 1.1
284 // NACK's. STOMP 1.1 or greater.
285 EREQMIDNAK = Error("message-id required, NACK") // 1.1
286 EREQSUBNAK = Error("subscription required, NACK") // 1.1
287 EREQIDNAK = Error("id required, NACK") // 1.2
289 // Transaction ID required.
290 EREQTIDBEG = Error("transaction-id required, BEGIN")
291 EREQTIDCOM = Error("transaction-id required, COMMIT")
292 EREQTIDABT = Error("transaction-id required, ABORT")
294 // Transaction ID present but empty.
295 ETIDBEGEMT = Error("transaction-id empty, BEGIN")
296 ETIDCOMEMT = Error("transaction-id empty, COMMIT")
297 ETIDABTEMT = Error("transaction-id empty, ABORT")
299 // Host header required, STOMP 1.1+
300 EREQHOST = Error("host header required for STOMP 1.1+")
302 // Subscription errors.
303 EDUPSID = Error("duplicate subscription-id")
304 EBADSID = Error("invalid subscription-id")
306 // Subscribe errors.
307 ESBADAM = Error("invalid ackmode, SUBSCRIBE")
309 // Unsubscribe error.
310 EUNOSID = Error("id required, UNSUBSCRIBE")
311 EUNODSID = Error("destination or id required, UNSUBSCRIBE") // 1.0
313 // Unsupported version error.
314 EBADVERCLI = Error("unsupported protocol version, client")
315 EBADVERSVR = Error("unsupported protocol version, server")
316 EBADVERNAK = Error("unsupported protocol version, NACK")
318 // Unsupported Headers type.
319 EBADHDR = Error("unsupported Headers type")
321 // Receipt not allowed on connect
322 ENORECPT = Error("receipt not allowed on CONNECT")
324 // Invalid broker command
325 EINVBCMD = Error("invalid broker command")
327 // Invalid receipt-id string
328 EBADRID = Error("invalid receipt-id")
332 A zero length buffer for convenience.
334 var NULLBUFF = make([]uint8, 0)
337 A no disconnect receipt Headers value for convenience.
339 var NoDiscReceipt = Headers{"noreceipt", "true"}
342 Codec data structure definition.
344 type codecdata struct {
345 encoded string
346 decoded string
350 STOMP specification defined encoded / decoded values for the Message
351 command and headers.
353 var codecValues = []codecdata{
354 codecdata{"\\\\", "\\"},
355 codecdata{"\\" + "n", "\n"},
356 codecdata{"\\" + "r", "\r"},
357 codecdata{"\\c", ":"},
361 Control data for initialization of heartbeats with STOMP 1.1+, and the
362 subsequent control of any heartbeat routines.
364 type heartBeatData struct {
365 sdl sync.Mutex // Send data lock
366 rdl sync.Mutex // Receive data lock
367 clk sync.Mutex // Shutdown lock
368 ssdn bool // Shutdown complete
370 cx int64 // client send value, ms
371 cy int64 // client receive value, ms
372 sx int64 // server send value, ms
373 sy int64 // server receive value, ms
375 hbs bool // sending heartbeats
376 hbr bool // receiving heartbeats
378 sti int64 // local sender ticker interval, ns
379 rti int64 // local receiver ticker interval, ns
381 sc int64 // local sender ticker count
382 rc int64 // local receiver ticker count
384 ssd chan struct{} // sender shutdown channel
385 rsd chan struct{} // receiver shutdown channel
387 ls int64 // last send time, ns
388 lr int64 // last receive time, ns
392 Control structure for basic client metrics.
394 type metrics struct {
395 st time.Time // Start Time
396 tfr int64 // Total frame reads
397 tbr int64 // Total bytes read
398 tfw int64 // Total frame writes
399 tbw int64 // Total bytes written
403 Valid broker commands.
405 var validCmds = map[string]bool{MESSAGE: true, ERROR: true, RECEIPT: true}
407 var logLock sync.Mutex
409 const (
410 NetProtoTCP = "tcp" // Protocol Name
413 var HandShake = []byte{0x15, 0x03, 0x03, 0x00}
416 Common Header keys
418 const (
419 HK_ACCEPT_VERSION = "accept-version"
420 HK_ACK = "ack"
421 HK_CONTENT_TYPE = "content-type"
422 HK_CONTENT_LENGTH = "content-length"
423 HK_DESTINATION = "destination"
424 HK_HEART_BEAT = "heart-beat"
425 HK_HOST = "host" // HK_VHOST aloas
426 HK_ID = "id"
427 HK_LOGIN = "login"
428 HK_MESSAGE = "message"
429 HK_MESSAGE_ID = "message-id"
430 HK_SUPPRESS_CL = "suppress-content-length" // Not in any spec, but used
431 HK_SUPPRESS_CT = "suppress-content-type" // Not in any spec, but used
432 HK_PASSCODE = "passcode"
433 HK_RECEIPT = "receipt"
434 HK_RECEIPT_ID = "receipt-id"
435 HK_SESSION = "session"
436 HK_SERVER = "server"
437 HK_SUBSCRIPTION = "subscription"
438 HK_TRANSACTION = "transaction"
439 HK_VERSION = "version"
440 HK_VHOST = "host" // HK_HOST alias
444 ACK Modes
446 const (
447 AckModeAuto = "auto"
448 AckModeClient = "client"
449 AckModeClientIndividual = "client-individual"
452 var (
453 validAckModes10 = map[string]bool{AckModeAuto: true,
454 AckModeClient: true}
455 validAckModes1x = map[string]bool{AckModeClientIndividual: true}
459 Default content-type.
461 const (
462 DFLT_CONTENT_TYPE = "text/plain; charset=UTF-8"
466 Extensions to STOMP protocol.
468 const (
469 StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header
470 StompPlusDrainNow = "sng_drnow" // UNSUBSCRIBE Header
473 var (
474 LFB = []byte("\n")
475 ZRB = []byte{0}