2 // Copyright © 2011-2019 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed, an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
29 // Client generated commands.
32 DISCONNECT
= "DISCONNECT"
34 SUBSCRIBE
= "SUBSCRIBE"
35 UNSUBSCRIBE
= "UNSUBSCRIBE"
42 // Server generated commands.
43 CONNECTED
= "CONNECTED"
48 // Supported STOMP protocol definitions.
55 What this package currently supports.
57 var supported
= []string{SPL_10
, SPL_11
, SPL_12
}
60 Headers definition, a slice of string.
62 STOMP headers are key and value pairs. See the specification for more
63 information about STOMP frame headers.
65 Key values are found at even numbered indices. Values
66 are found at odd numbered indices. Headers are validated for an even
67 number of slice elements.
72 Message is a STOMP Message, consisting of: a STOMP command; a set of STOMP
73 Headers; and a message body(payload), which is possibly empty.
82 Frame is an alternate name for a Message.
87 MessageData passed to the client, containing: the Message; and an Error
88 value which is possibly nil.
90 Note that this has no relevance on whether a MessageData.Message.Command
91 value contains an "ERROR" generated by the broker.
93 type MessageData
struct {
99 This is outbound on the wire.
101 type wiredata
struct {
107 Stomper is an interface that models STOMP specification commands.
109 type Stomper
interface {
110 Abort(h Headers
) error
111 Ack(headers Headers
) error
112 Begin(h Headers
) error
113 Commit(h Headers
) error
114 Disconnect(headers Headers
) error
115 Nack(headers Headers
) error
116 Send(Headers
, string) error
117 Subscribe(headers Headers
) (<-chan MessageData
, error
)
118 Unsubscribe(headers Headers
) error
120 SendBytes(h Headers
, b
[]byte) error
124 StatsReader is an interface that modela a reader for the statistics
125 maintained by the stompngo package.
127 type StatsReader
interface {
130 FramesWritten() int64
135 HBDataReader is an interface that modela a reader for the heart beat
136 data maintained by the stompngo package.
138 type HBDataReader
interface {
139 SendTickerInterval() int64
140 ReceiveTickerInterval() int64
141 SendTickerCount() int64
142 ReceiveTickerCount() int64
146 Deadliner is an interface that models the optional network deadline
147 functionality implemented by the stompngo package.
149 type Deadliner
interface {
150 WriteDeadline(d time
.Duration
)
151 EnableWriteDeadline(e
bool)
152 ExpiredNotification(enf ExpiredNotification
)
153 IsWriteDeadlineEnabled() bool
154 ReadDeadline(d time
.Duration
)
155 EnableReadDeadline(e
bool)
156 IsReadDeadlineEnabled() bool
157 ShortWriteRecovery(ro
bool)
161 Monitor is an interface that models monitoring a stompngo connection.
163 type Monitor
interface {
167 Running() time
.Duration
172 ParmHandler is an interface that models stompngo client parameter
175 type ParmHandler
interface {
176 SetLogger(l
*log
.Logger
)
177 GetLogger() *log
.Logger
178 SetSubChanCap(nc
int)
182 STOMPConnector is an interface that encapsulates the Connection struct.
184 type STOMPConnector
interface {
195 Connection is a representation of a STOMP connection.
197 type Connection
struct {
198 ConnectResponse
*Message
// Broker response (CONNECTED/ERROR) if physical connection successful.
199 DisconnectReceipt MessageData
// If receipt requested on DISCONNECT.
200 MessageData
<-chan MessageData
// Inbound data for the client.
202 connLock sync
.Mutex
// connected variable lock
204 sessLock sync
.Mutex
// session variable lock
206 protoLock sync
.Mutex
// protocol variable lock
207 input
chan MessageData
210 subs
map[string]*subscription
211 subsLock sync
.RWMutex
212 ssdc
chan struct{} // System shutdown channel
213 abortOnce sync
.Once
// Ensure close ssdc once
214 wtrsdc
chan struct{} // Special writer shutdown channel
218 Hbrf
bool // Indicates a heart beat read/receive failure, which is possibly transient. Valid for 1.1+ only.
219 Hbsf
bool // Indicates a heart beat send failure, which is possibly transient. Valid for 1.1+ only.
221 mets
*metrics
// Client metrics
222 scc
int // Subscribe channel capacity
223 discLock sync
.Mutex
// DISCONNECT lock
224 dld
*deadlineData
// Deadline data
227 type subscription
struct {
228 md
chan MessageData
// Subscription specific MessageData channel
229 id
string // Subscription id (unique, self reference)
230 am
string // ACK mode for this subscription
231 cs
bool // Closed during shutdown
232 drav
bool // Drain After value validity
233 dra
uint // Start draining after # messages (MESSAGE frames)
234 drmc
uint // Current drain count if draining
246 // ERROR Frame returned by broker on connect.
247 ECONERR
= Error("broker returned ERROR frame, CONNECT")
249 // ERRORs for Headers.
250 EHDRLEN
= Error("unmatched headers, bad length")
251 EHDRUTF8
= Error("header string not UTF8")
252 EHDRNIL
= Error("headers can not be nil")
253 EUNKHDR
= Error("corrupt frame headers")
254 EHDRMTK
= Error("header key can not be empty")
255 EHDRMTV
= Error("header value can not be empty")
257 // ERRORs for response to CONNECT.
258 EUNKFRM
= Error("unrecognized frame returned, CONNECT")
259 EBADFRM
= Error("Malformed frame")
260 EBADSSLP
= Error("Got HandShake data, wrong SSL port?")
262 // No body allowed error
263 EBDYDATA
= Error("body data not allowed")
266 ECONBAD
= Error("no current connection or DISCONNECT previously completed")
268 // Destination required
269 EREQDSTSND
= Error("destination required, SEND")
270 EREQDSTSUB
= Error("destination required, SUBSCRIBE")
271 EREQDIUNS
= Error("destination required, UNSUBSCRIBE")
272 EREQDSTUNS
= Error("destination required, UNSUBSCRIBE") // Alternate name
275 EREQIDUNS
= Error("id required, UNSUBSCRIBE")
277 // Message ID required.
278 EREQMIDACK
= Error("message-id required, ACK") // 1.0, 1.1
279 EREQIDACK
= Error("id required, ACK") // 1.2
281 // Subscription required.
282 EREQSUBACK
= Error("subscription required, ACK") // 1.1
284 // NACK's. STOMP 1.1 or greater.
285 EREQMIDNAK
= Error("message-id required, NACK") // 1.1
286 EREQSUBNAK
= Error("subscription required, NACK") // 1.1
287 EREQIDNAK
= Error("id required, NACK") // 1.2
289 // Transaction ID required.
290 EREQTIDBEG
= Error("transaction-id required, BEGIN")
291 EREQTIDCOM
= Error("transaction-id required, COMMIT")
292 EREQTIDABT
= Error("transaction-id required, ABORT")
294 // Transaction ID present but empty.
295 ETIDBEGEMT
= Error("transaction-id empty, BEGIN")
296 ETIDCOMEMT
= Error("transaction-id empty, COMMIT")
297 ETIDABTEMT
= Error("transaction-id empty, ABORT")
299 // Host header required, STOMP 1.1+
300 EREQHOST
= Error("host header required for STOMP 1.1+")
302 // Subscription errors.
303 EDUPSID
= Error("duplicate subscription-id")
304 EBADSID
= Error("invalid subscription-id")
307 ESBADAM
= Error("invalid ackmode, SUBSCRIBE")
309 // Unsubscribe error.
310 EUNOSID
= Error("id required, UNSUBSCRIBE")
311 EUNODSID
= Error("destination or id required, UNSUBSCRIBE") // 1.0
313 // Unsupported version error.
314 EBADVERCLI
= Error("unsupported protocol version, client")
315 EBADVERSVR
= Error("unsupported protocol version, server")
316 EBADVERNAK
= Error("unsupported protocol version, NACK")
318 // Unsupported Headers type.
319 EBADHDR
= Error("unsupported Headers type")
321 // Receipt not allowed on connect
322 ENORECPT
= Error("receipt not allowed on CONNECT")
324 // Invalid broker command
325 EINVBCMD
= Error("invalid broker command")
327 // Invalid receipt-id string
328 EBADRID
= Error("invalid receipt-id")
332 A zero length buffer for convenience.
334 var NULLBUFF
= make([]uint8, 0)
337 A no disconnect receipt Headers value for convenience.
339 var NoDiscReceipt
= Headers
{"noreceipt", "true"}
342 Codec data structure definition.
344 type codecdata
struct {
350 STOMP specification defined encoded / decoded values for the Message
353 var codecValues
= []codecdata
{
354 codecdata
{"\\\\", "\\"},
355 codecdata
{"\\" + "n", "\n"},
356 codecdata
{"\\" + "r", "\r"},
357 codecdata
{"\\c", ":"},
361 Control data for initialization of heartbeats with STOMP 1.1+, and the
362 subsequent control of any heartbeat routines.
364 type heartBeatData
struct {
365 sdl sync
.Mutex
// Send data lock
366 rdl sync
.Mutex
// Receive data lock
367 clk sync
.Mutex
// Shutdown lock
368 ssdn
bool // Shutdown complete
370 cx
int64 // client send value, ms
371 cy
int64 // client receive value, ms
372 sx
int64 // server send value, ms
373 sy
int64 // server receive value, ms
375 hbs
bool // sending heartbeats
376 hbr
bool // receiving heartbeats
378 sti
int64 // local sender ticker interval, ns
379 rti
int64 // local receiver ticker interval, ns
381 sc
int64 // local sender ticker count
382 rc
int64 // local receiver ticker count
384 ssd
chan struct{} // sender shutdown channel
385 rsd
chan struct{} // receiver shutdown channel
387 ls
int64 // last send time, ns
388 lr
int64 // last receive time, ns
392 Control structure for basic client metrics.
394 type metrics
struct {
395 st time
.Time
// Start Time
396 tfr
int64 // Total frame reads
397 tbr
int64 // Total bytes read
398 tfw
int64 // Total frame writes
399 tbw
int64 // Total bytes written
403 Valid broker commands.
405 var validCmds
= map[string]bool{MESSAGE
: true, ERROR
: true, RECEIPT
: true}
407 var logLock sync
.Mutex
410 NetProtoTCP
= "tcp" // Protocol Name
413 var HandShake
= []byte{0x15, 0x03, 0x03, 0x00}
419 HK_ACCEPT_VERSION
= "accept-version"
421 HK_CONTENT_TYPE
= "content-type"
422 HK_CONTENT_LENGTH
= "content-length"
423 HK_DESTINATION
= "destination"
424 HK_HEART_BEAT
= "heart-beat"
425 HK_HOST
= "host" // HK_VHOST aloas
428 HK_MESSAGE
= "message"
429 HK_MESSAGE_ID
= "message-id"
430 HK_SUPPRESS_CL
= "suppress-content-length" // Not in any spec, but used
431 HK_SUPPRESS_CT
= "suppress-content-type" // Not in any spec, but used
432 HK_PASSCODE
= "passcode"
433 HK_RECEIPT
= "receipt"
434 HK_RECEIPT_ID
= "receipt-id"
435 HK_SESSION
= "session"
437 HK_SUBSCRIPTION
= "subscription"
438 HK_TRANSACTION
= "transaction"
439 HK_VERSION
= "version"
440 HK_VHOST
= "host" // HK_HOST alias
448 AckModeClient
= "client"
449 AckModeClientIndividual
= "client-individual"
453 validAckModes10
= map[string]bool{AckModeAuto
: true,
455 validAckModes1x
= map[string]bool{AckModeClientIndividual
: true}
459 Default content-type.
462 DFLT_CONTENT_TYPE
= "text/plain; charset=UTF-8"
466 Extensions to STOMP protocol.
469 StompPlusDrainAfter
= "sng_drafter" // SUBSCRIBE Header
470 StompPlusDrainNow
= "sng_drnow" // UNSUBSCRIBE Header