2 // Copyright © 2011-2018 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed, an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
29 // Client generated commands.
32 DISCONNECT
= "DISCONNECT"
34 SUBSCRIBE
= "SUBSCRIBE"
35 UNSUBSCRIBE
= "UNSUBSCRIBE"
42 // Server generated commands.
43 CONNECTED
= "CONNECTED"
48 // Supported STOMP protocol definitions.
55 What this package currently supports.
57 var supported
= []string{SPL_10
, SPL_11
, SPL_12
}
60 Headers definition, a slice of string.
62 STOMP headers are key and value pairs. See the specification for more
63 information about STOMP frame headers.
65 Key values are found at even numbered indices. Values
66 are found at odd numbered indices. Headers are validated for an even
67 number of slice elements.
72 Message is a STOMP Message, consisting of: a STOMP command; a set of STOMP
73 Headers; and a message body(payload), which is possibly empty.
82 Frame is an alternate name for a Message.
87 MessageData passed to the client, containing: the Message; and an Error
88 value which is possibly nil.
90 Note that this has no relevance on whether a MessageData.Message.Command
91 value contains an "ERROR" generated by the broker.
93 type MessageData
struct {
99 This is outbound on the wire.
101 type wiredata
struct {
107 Stomper is an interface that models STOMP specification commands.
109 type Stomper
interface {
110 Abort(h Headers
) error
111 Ack(headers Headers
) error
112 Begin(h Headers
) error
113 Commit(h Headers
) error
114 Disconnect(headers Headers
) error
115 Nack(headers Headers
) error
116 Send(Headers
, string) error
117 Subscribe(headers Headers
) (<-chan MessageData
, error
)
118 Unsubscribe(headers Headers
) error
120 SendBytes(h Headers
, b
[]byte) error
124 StatsReader is an interface that modela a reader for the statistics
125 maintained by the stompngo package.
127 type StatsReader
interface {
130 FramesWritten() int64
135 HBDataReader is an interface that modela a reader for the heart beat
136 data maintained by the stompngo package.
138 type HBDataReader
interface {
139 SendTickerInterval() int64
140 ReceiveTickerInterval() int64
141 SendTickerCount() int64
142 ReceiveTickerCount() int64
146 Deadliner is an interface that models the optional network deadline
147 functionality implemented by the stompngo package.
149 type Deadliner
interface {
150 WriteDeadline(d time
.Duration
)
151 EnableWriteDeadline(e
bool)
152 ExpiredNotification(enf ExpiredNotification
)
153 IsWriteDeadlineEnabled() bool
154 ReadDeadline(d time
.Duration
)
155 EnableReadDeadline(e
bool)
156 IsReadDeadlineEnabled() bool
157 ShortWriteRecovery(ro
bool)
161 Monitor is an interface that models monitoring a stompngo connection.
163 type Monitor
interface {
167 Running() time
.Duration
172 ParmHandler is an interface that models stompngo client parameter
175 type ParmHandler
interface {
176 SetLogger(l
*log
.Logger
)
177 GetLogger() *log
.Logger
178 SetSubChanCap(nc
int)
182 STOMPConnector is an interface that encapsulates the Connection struct.
184 type STOMPConnector
interface {
195 Connection is a representation of a STOMP connection.
197 type Connection
struct {
198 ConnectResponse
*Message
// Broker response (CONNECTED/ERROR) if physical connection successful.
199 DisconnectReceipt MessageData
// If receipt requested on DISCONNECT.
200 MessageData
<-chan MessageData
// Inbound data for the client.
202 connLock sync
.Mutex
// connected variable lock
204 sessLock sync
.Mutex
// session variable lock
206 protoLock sync
.Mutex
// protocol variable lock
207 input
chan MessageData
210 subs
map[string]*subscription
211 subsLock sync
.RWMutex
212 ssdc
chan struct{} // System shutdown channel
213 abortOnce sync
.Once
// Ensure close ssdc once
214 wtrsdc
chan struct{} // Special writer shutdown channel
218 Hbrf
bool // Indicates a heart beat read/receive failure, which is possibly transient. Valid for 1.1+ only.
219 Hbsf
bool // Indicates a heart beat send failure, which is possibly transient. Valid for 1.1+ only.
221 mets
*metrics
// Client metrics
222 scc
int // Subscribe channel capacity
223 discLock sync
.Mutex
// DISCONNECT lock
224 dld
*deadlineData
// Deadline data
227 type subscription
struct {
228 md
chan MessageData
// Subscription specific MessageData channel
229 id
string // Subscription id (unique, self reference)
230 am
string // ACK mode for this subscription
231 cs
bool // Closed during shutdown
232 drav
bool // Drain After value validity
233 dra
uint // Start draining after # messages (MESSAGE frames)
234 drmc
uint // Current drain count if draining
246 // ERROR Frame returned by broker on connect.
247 ECONERR
= Error("broker returned ERROR frame, CONNECT")
249 // ERRORs for Headers.
250 EHDRLEN
= Error("unmatched headers, bad length")
251 EHDRUTF8
= Error("header string not UTF8")
252 EHDRNIL
= Error("headers can not be nil")
253 EUNKHDR
= Error("corrupt frame headers")
254 EHDRMTK
= Error("header key can not be empty")
255 EHDRMTV
= Error("header value can not be empty")
257 // ERRORs for response to CONNECT.
258 EUNKFRM
= Error("unrecognized frame returned, CONNECT")
259 EBADFRM
= Error("Malformed frame")
260 EBADSSLP
= Error("Got HandShake data, worng SSL port?")
262 // No body allowed error
263 EBDYDATA
= Error("body data not allowed")
266 ECONBAD
= Error("no current connection or DISCONNECT previously completed")
268 // Destination required
269 EREQDSTSND
= Error("destination required, SEND")
270 EREQDSTSUB
= Error("destination required, SUBSCRIBE")
271 EREQDIUNS
= Error("destination required, UNSUBSCRIBE")
272 EREQDSTUNS
= Error("destination required, UNSUBSCRIBE") // Alternate name
275 EREQIDUNS
= Error("id required, UNSUBSCRIBE")
277 // Message ID required.
278 EREQMIDACK
= Error("message-id required, ACK") // 1.0, 1.1
279 EREQIDACK
= Error("id required, ACK") // 1.2
281 // Subscription required.
282 EREQSUBACK
= Error("subscription required, ACK") // 1.1
284 // NACK's. STOMP 1.1 or greater.
285 EREQMIDNAK
= Error("message-id required, NACK") // 1.1
286 EREQSUBNAK
= Error("subscription required, NACK") // 1.1
287 EREQIDNAK
= Error("id required, NACK") // 1.2
289 // Transaction ID required.
290 EREQTIDBEG
= Error("transaction-id required, BEGIN")
291 EREQTIDCOM
= Error("transaction-id required, COMMIT")
292 EREQTIDABT
= Error("transaction-id required, ABORT")
294 // Transaction ID present but empty.
295 ETIDBEGEMT
= Error("transaction-id empty, BEGIN")
296 ETIDCOMEMT
= Error("transaction-id empty, COMMIT")
297 ETIDABTEMT
= Error("transaction-id empty, ABORT")
299 // Host header required, STOMP 1.1+
300 EREQHOST
= Error("host header required for STOMP 1.1+")
302 // Subscription errors.
303 EDUPSID
= Error("duplicate subscription-id")
304 EBADSID
= Error("invalid subscription-id")
307 ESBADAM
= Error("invalid ackmode, SUBSCRIBE")
309 // Unsubscribe error.
310 EUNOSID
= Error("id required, UNSUBSCRIBE")
311 EUNODSID
= Error("destination or id required, UNSUBSCRIBE") // 1.0
313 // Unsupported version error.
314 EBADVERCLI
= Error("unsupported protocol version, client")
315 EBADVERSVR
= Error("unsupported protocol version, server")
316 EBADVERNAK
= Error("unsupported protocol version, NACK")
318 // Unsupported Headers type.
319 EBADHDR
= Error("unsupported Headers type")
321 // Receipt not allowed on connect
322 ENORECPT
= Error("receipt not allowed on CONNECT")
324 // Invalid broker command
325 EINVBCMD
= Error("invalid broker command")
329 A zero length buffer for convenience.
331 var NULLBUFF
= make([]uint8, 0)
334 A no disconnect receipt Headers value for convenience.
336 var NoDiscReceipt
= Headers
{"noreceipt", "true"}
339 Codec data structure definition.
341 type codecdata
struct {
347 STOMP specification defined encoded / decoded values for the Message
350 var codecValues
= []codecdata
{
351 codecdata
{"\\\\", "\\"},
352 codecdata
{"\\" + "n", "\n"},
353 codecdata
{"\\" + "r", "\r"},
354 codecdata
{"\\c", ":"},
358 Control data for initialization of heartbeats with STOMP 1.1+, and the
359 subsequent control of any heartbeat routines.
361 type heartBeatData
struct {
362 sdl sync
.Mutex
// Send data lock
363 rdl sync
.Mutex
// Receive data lock
364 clk sync
.Mutex
// Shutdown lock
365 ssdn
bool // Shutdown complete
367 cx
int64 // client send value, ms
368 cy
int64 // client receive value, ms
369 sx
int64 // server send value, ms
370 sy
int64 // server receive value, ms
372 hbs
bool // sending heartbeats
373 hbr
bool // receiving heartbeats
375 sti
int64 // local sender ticker interval, ns
376 rti
int64 // local receiver ticker interval, ns
378 sc
int64 // local sender ticker count
379 rc
int64 // local receiver ticker count
381 ssd
chan struct{} // sender shutdown channel
382 rsd
chan struct{} // receiver shutdown channel
384 ls
int64 // last send time, ns
385 lr
int64 // last receive time, ns
389 Control structure for basic client metrics.
391 type metrics
struct {
392 st time
.Time
// Start Time
393 tfr
int64 // Total frame reads
394 tbr
int64 // Total bytes read
395 tfw
int64 // Total frame writes
396 tbw
int64 // Total bytes written
400 Valid broker commands.
402 var validCmds
= map[string]bool{MESSAGE
: true, ERROR
: true, RECEIPT
: true}
404 var logLock sync
.Mutex
407 NetProtoTCP
= "tcp" // Protocol Name
410 var HandShake
= []byte{0x15, 0x03, 0x03, 0x00}
416 HK_ACCEPT_VERSION
= "accept-version"
418 HK_CONTENT_TYPE
= "content-type"
419 HK_CONTENT_LENGTH
= "content-length"
420 HK_DESTINATION
= "destination"
421 HK_HEART_BEAT
= "heart-beat"
422 HK_HOST
= "host" // HK_VHOST aloas
425 HK_MESSAGE
= "message"
426 HK_MESSAGE_ID
= "message-id"
427 HK_SUPPRESS_CL
= "suppress-content-length" // Not in any spec, but used
428 HK_SUPPRESS_CT
= "suppress-content-type" // Not in any spec, but used
429 HK_PASSCODE
= "passcode"
430 HK_RECEIPT
= "receipt"
431 HK_RECEIPT_ID
= "receipt-id"
432 HK_SESSION
= "session"
434 HK_SUBSCRIPTION
= "subscription"
435 HK_TRANSACTION
= "transaction"
436 HK_VERSION
= "version"
437 HK_VHOST
= "host" // HK_HOST alias
445 AckModeClient
= "client"
446 AckModeClientIndividual
= "client-individual"
450 validAckModes10
= map[string]bool{AckModeAuto
: true,
452 validAckModes1x
= map[string]bool{AckModeClientIndividual
: true}
456 Default content-type.
459 DFLT_CONTENT_TYPE
= "text/plain; charset=UTF-8"
463 Extensions to STOMP protocol.
466 StompPlusDrainAfter
= "sng_drafter" // SUBSCRIBE Header
467 StompPlusDrainNow
= "sng_drnow" // UNSUBSCRIBE Header