Modification level bump.
[stompngo.git] / data.go
blobf5508637c851c8ab7d6b5cfc5a93552102bf08b2
1 //
2 // Copyright © 2011-2018 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed, an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "bufio"
21 "log"
22 "net"
23 "sync"
24 "time"
27 const (
29 // Client generated commands.
30 CONNECT = "CONNECT"
31 STOMP = "STOMP"
32 DISCONNECT = "DISCONNECT"
33 SEND = "SEND"
34 SUBSCRIBE = "SUBSCRIBE"
35 UNSUBSCRIBE = "UNSUBSCRIBE"
36 ACK = "ACK"
37 NACK = "NACK"
38 BEGIN = "BEGIN"
39 COMMIT = "COMMIT"
40 ABORT = "ABORT"
42 // Server generated commands.
43 CONNECTED = "CONNECTED"
44 MESSAGE = "MESSAGE"
45 RECEIPT = "RECEIPT"
46 ERROR = "ERROR"
48 // Supported STOMP protocol definitions.
49 SPL_10 = "1.0"
50 SPL_11 = "1.1"
51 SPL_12 = "1.2"
55 What this package currently supports.
57 var supported = []string{SPL_10, SPL_11, SPL_12}
60 Headers definition, a slice of string.
62 STOMP headers are key and value pairs. See the specification for more
63 information about STOMP frame headers.
65 Key values are found at even numbered indices. Values
66 are found at odd numbered indices. Headers are validated for an even
67 number of slice elements.
69 type Headers []string
72 Message is a STOMP Message, consisting of: a STOMP command; a set of STOMP
73 Headers; and a message body(payload), which is possibly empty.
75 type Message struct {
76 Command string
77 Headers Headers
78 Body []uint8
82 Frame is an alternate name for a Message.
84 type Frame Message
87 MessageData passed to the client, containing: the Message; and an Error
88 value which is possibly nil.
90 Note that this has no relevance on whether a MessageData.Message.Command
91 value contains an "ERROR" generated by the broker.
93 type MessageData struct {
94 Message Message
95 Error error
99 This is outbound on the wire.
101 type wiredata struct {
102 frame Frame
103 errchan chan error
107 Stomper is an interface that models STOMP specification commands.
109 type Stomper interface {
110 Abort(h Headers) error
111 Ack(headers Headers) error
112 Begin(h Headers) error
113 Commit(h Headers) error
114 Disconnect(headers Headers) error
115 Nack(headers Headers) error
116 Send(Headers, string) error
117 Subscribe(headers Headers) (<-chan MessageData, error)
118 Unsubscribe(headers Headers) error
120 SendBytes(h Headers, b []byte) error
124 StatsReader is an interface that modela a reader for the statistics
125 maintained by the stompngo package.
127 type StatsReader interface {
128 FramesRead() int64
129 BytesRead() int64
130 FramesWritten() int64
131 BytesWritten() int64
135 HBDataReader is an interface that modela a reader for the heart beat
136 data maintained by the stompngo package.
138 type HBDataReader interface {
139 SendTickerInterval() int64
140 ReceiveTickerInterval() int64
141 SendTickerCount() int64
142 ReceiveTickerCount() int64
146 Deadliner is an interface that models the optional network deadline
147 functionality implemented by the stompngo package.
149 type Deadliner interface {
150 WriteDeadline(d time.Duration)
151 EnableWriteDeadline(e bool)
152 ExpiredNotification(enf ExpiredNotification)
153 IsWriteDeadlineEnabled() bool
154 ReadDeadline(d time.Duration)
155 EnableReadDeadline(e bool)
156 IsReadDeadlineEnabled() bool
157 ShortWriteRecovery(ro bool)
161 Monitor is an interface that models monitoring a stompngo connection.
163 type Monitor interface {
164 Connected() bool
165 Session() string
166 Protocol() string
167 Running() time.Duration
168 SubChanCap() int
172 ParmHandler is an interface that models stompngo client parameter
173 specification.
175 type ParmHandler interface {
176 SetLogger(l *log.Logger)
177 GetLogger() *log.Logger
178 SetSubChanCap(nc int)
182 STOMPConnector is an interface that encapsulates the Connection struct.
184 type STOMPConnector interface {
185 Stomper
186 StatsReader
187 HBDataReader
188 Deadliner
189 Monitor
190 ParmHandler
195 Connection is a representation of a STOMP connection.
197 type Connection struct {
198 ConnectResponse *Message // Broker response (CONNECTED/ERROR) if physical connection successful.
199 DisconnectReceipt MessageData // If receipt requested on DISCONNECT.
200 MessageData <-chan MessageData // Inbound data for the client.
201 connected bool
202 connLock sync.Mutex // connected variable lock
203 session string
204 sessLock sync.Mutex // session variable lock
205 protocol string
206 protoLock sync.Mutex // protocol variable lock
207 input chan MessageData
208 output chan wiredata
209 netconn net.Conn
210 subs map[string]*subscription
211 subsLock sync.RWMutex
212 ssdc chan struct{} // System shutdown channel
213 abortOnce sync.Once // Ensure close ssdc once
214 wtrsdc chan struct{} // Special writer shutdown channel
215 hbd *heartBeatData
216 wtr *bufio.Writer
217 rdr *bufio.Reader
218 Hbrf bool // Indicates a heart beat read/receive failure, which is possibly transient. Valid for 1.1+ only.
219 Hbsf bool // Indicates a heart beat send failure, which is possibly transient. Valid for 1.1+ only.
220 logger *log.Logger
221 mets *metrics // Client metrics
222 scc int // Subscribe channel capacity
223 discLock sync.Mutex // DISCONNECT lock
224 dld *deadlineData // Deadline data
227 type subscription struct {
228 md chan MessageData // Subscription specific MessageData channel
229 id string // Subscription id (unique, self reference)
230 am string // ACK mode for this subscription
231 cs bool // Closed during shutdown
232 drav bool // Drain After value validity
233 dra uint // Start draining after # messages (MESSAGE frames)
234 drmc uint // Current drain count if draining
238 Error definition.
240 type Error string
243 Error constants.
245 const (
246 // ERROR Frame returned by broker on connect.
247 ECONERR = Error("broker returned ERROR frame, CONNECT")
249 // ERRORs for Headers.
250 EHDRLEN = Error("unmatched headers, bad length")
251 EHDRUTF8 = Error("header string not UTF8")
252 EHDRNIL = Error("headers can not be nil")
253 EUNKHDR = Error("corrupt frame headers")
254 EHDRMTK = Error("header key can not be empty")
255 EHDRMTV = Error("header value can not be empty")
257 // ERRORs for response to CONNECT.
258 EUNKFRM = Error("unrecognized frame returned, CONNECT")
259 EBADFRM = Error("Malformed frame")
260 EBADSSLP = Error("Got HandShake data, worng SSL port?")
262 // No body allowed error
263 EBDYDATA = Error("body data not allowed")
265 // Not connected.
266 ECONBAD = Error("no current connection or DISCONNECT previously completed")
268 // Destination required
269 EREQDSTSND = Error("destination required, SEND")
270 EREQDSTSUB = Error("destination required, SUBSCRIBE")
271 EREQDIUNS = Error("destination required, UNSUBSCRIBE")
272 EREQDSTUNS = Error("destination required, UNSUBSCRIBE") // Alternate name
274 // id required
275 EREQIDUNS = Error("id required, UNSUBSCRIBE")
277 // Message ID required.
278 EREQMIDACK = Error("message-id required, ACK") // 1.0, 1.1
279 EREQIDACK = Error("id required, ACK") // 1.2
281 // Subscription required.
282 EREQSUBACK = Error("subscription required, ACK") // 1.1
284 // NACK's. STOMP 1.1 or greater.
285 EREQMIDNAK = Error("message-id required, NACK") // 1.1
286 EREQSUBNAK = Error("subscription required, NACK") // 1.1
287 EREQIDNAK = Error("id required, NACK") // 1.2
289 // Transaction ID required.
290 EREQTIDBEG = Error("transaction-id required, BEGIN")
291 EREQTIDCOM = Error("transaction-id required, COMMIT")
292 EREQTIDABT = Error("transaction-id required, ABORT")
294 // Transaction ID present but empty.
295 ETIDBEGEMT = Error("transaction-id empty, BEGIN")
296 ETIDCOMEMT = Error("transaction-id empty, COMMIT")
297 ETIDABTEMT = Error("transaction-id empty, ABORT")
299 // Host header required, STOMP 1.1+
300 EREQHOST = Error("host header required for STOMP 1.1+")
302 // Subscription errors.
303 EDUPSID = Error("duplicate subscription-id")
304 EBADSID = Error("invalid subscription-id")
306 // Subscribe errors.
307 ESBADAM = Error("invalid ackmode, SUBSCRIBE")
309 // Unsubscribe error.
310 EUNOSID = Error("id required, UNSUBSCRIBE")
311 EUNODSID = Error("destination or id required, UNSUBSCRIBE") // 1.0
313 // Unsupported version error.
314 EBADVERCLI = Error("unsupported protocol version, client")
315 EBADVERSVR = Error("unsupported protocol version, server")
316 EBADVERNAK = Error("unsupported protocol version, NACK")
318 // Unsupported Headers type.
319 EBADHDR = Error("unsupported Headers type")
321 // Receipt not allowed on connect
322 ENORECPT = Error("receipt not allowed on CONNECT")
324 // Invalid broker command
325 EINVBCMD = Error("invalid broker command")
329 A zero length buffer for convenience.
331 var NULLBUFF = make([]uint8, 0)
334 A no disconnect receipt Headers value for convenience.
336 var NoDiscReceipt = Headers{"noreceipt", "true"}
339 Codec data structure definition.
341 type codecdata struct {
342 encoded string
343 decoded string
347 STOMP specification defined encoded / decoded values for the Message
348 command and headers.
350 var codecValues = []codecdata{
351 codecdata{"\\\\", "\\"},
352 codecdata{"\\" + "n", "\n"},
353 codecdata{"\\" + "r", "\r"},
354 codecdata{"\\c", ":"},
358 Control data for initialization of heartbeats with STOMP 1.1+, and the
359 subsequent control of any heartbeat routines.
361 type heartBeatData struct {
362 sdl sync.Mutex // Send data lock
363 rdl sync.Mutex // Receive data lock
364 clk sync.Mutex // Shutdown lock
365 ssdn bool // Shutdown complete
367 cx int64 // client send value, ms
368 cy int64 // client receive value, ms
369 sx int64 // server send value, ms
370 sy int64 // server receive value, ms
372 hbs bool // sending heartbeats
373 hbr bool // receiving heartbeats
375 sti int64 // local sender ticker interval, ns
376 rti int64 // local receiver ticker interval, ns
378 sc int64 // local sender ticker count
379 rc int64 // local receiver ticker count
381 ssd chan struct{} // sender shutdown channel
382 rsd chan struct{} // receiver shutdown channel
384 ls int64 // last send time, ns
385 lr int64 // last receive time, ns
389 Control structure for basic client metrics.
391 type metrics struct {
392 st time.Time // Start Time
393 tfr int64 // Total frame reads
394 tbr int64 // Total bytes read
395 tfw int64 // Total frame writes
396 tbw int64 // Total bytes written
400 Valid broker commands.
402 var validCmds = map[string]bool{MESSAGE: true, ERROR: true, RECEIPT: true}
404 var logLock sync.Mutex
406 const (
407 NetProtoTCP = "tcp" // Protocol Name
410 var HandShake = []byte{0x15, 0x03, 0x03, 0x00}
413 Common Header keys
415 const (
416 HK_ACCEPT_VERSION = "accept-version"
417 HK_ACK = "ack"
418 HK_CONTENT_TYPE = "content-type"
419 HK_CONTENT_LENGTH = "content-length"
420 HK_DESTINATION = "destination"
421 HK_HEART_BEAT = "heart-beat"
422 HK_HOST = "host" // HK_VHOST aloas
423 HK_ID = "id"
424 HK_LOGIN = "login"
425 HK_MESSAGE = "message"
426 HK_MESSAGE_ID = "message-id"
427 HK_SUPPRESS_CL = "suppress-content-length" // Not in any spec, but used
428 HK_SUPPRESS_CT = "suppress-content-type" // Not in any spec, but used
429 HK_PASSCODE = "passcode"
430 HK_RECEIPT = "receipt"
431 HK_RECEIPT_ID = "receipt-id"
432 HK_SESSION = "session"
433 HK_SERVER = "server"
434 HK_SUBSCRIPTION = "subscription"
435 HK_TRANSACTION = "transaction"
436 HK_VERSION = "version"
437 HK_VHOST = "host" // HK_HOST alias
441 ACK Modes
443 const (
444 AckModeAuto = "auto"
445 AckModeClient = "client"
446 AckModeClientIndividual = "client-individual"
449 var (
450 validAckModes10 = map[string]bool{AckModeAuto: true,
451 AckModeClient: true}
452 validAckModes1x = map[string]bool{AckModeClientIndividual: true}
456 Default content-type.
458 const (
459 DFLT_CONTENT_TYPE = "text/plain; charset=UTF-8"
463 Extensions to STOMP protocol.
465 const (
466 StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header
467 StompPlusDrainNow = "sng_drnow" // UNSUBSCRIBE Header
470 var (
471 LFB = []byte("\n")
472 ZRB = []byte{0}