2 // Copyright © 2011-2019 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
28 Subscribe to a STOMP subscription.
30 Headers MUST contain a "destination" header key.
32 All clients are recommended to supply a unique HK_ID header on Subscribe.
34 For STOMP 1.0 clients: if an "id" header is supplied, attempt to use it.
35 If the "id" header is not unique in the session, return an error. If no
36 "id" header is supplied, attempt to generate a unique subscription id based
37 on the destination name. If a unique subscription id cannot be generated,
40 For STOMP 1.1+ clients: If any client does not supply an HK_ID header,
41 attempt to generate a unique "id". In all cases, do not allow duplicate
42 subscription "id"s in this session.
44 In summary, multiple subscriptions to the same destination are not allowed
45 unless a unique "id" is supplied.
47 For details about the returned MessageData channel, see: https://github.com/gmallard/stompngo/wiki/subscribe-and-messagedata
50 // Possible additional Header keys: "ack", "id".
51 h := stompngo.Headers{stompngo.HK_DESTINATION, "/queue/myqueue"}
52 s, e := c.Subscribe(h)
54 // Do something sane ...
58 func (c
*Connection
) Subscribe(h Headers
) (<-chan MessageData
, error
) {
59 c
.log(SUBSCRIBE
, "start", h
, c
.Protocol())
63 e
:= checkHeaders(h
, c
.Protocol())
67 e
= c
.checkSubscribeHeaders(h
)
72 if _
, ok
:= ch
.Contains(HK_ACK
); !ok
{
73 ch
= append(ch
, HK_ACK
, AckModeAuto
)
75 sub
, e
, ch
:= c
.establishSubscription(ch
)
80 f
:= Frame
{SUBSCRIBE
, ch
, NULLBUFF
}
83 if e
= c
.writeWireData(wiredata
{f
, r
}); e
!= nil {
87 c
.log(SUBSCRIBE
, "end", ch
, c
.Protocol())
92 Check SUBSCRIBE specific requirements.
94 func (c
*Connection
) checkSubscribeHeaders(h Headers
) error
{
95 if _
, ok
:= h
.Contains(HK_DESTINATION
); !ok
{
99 am
, ok
:= h
.Contains(HK_ACK
)
101 switch c
.Protocol() {
103 if ok
{ // Client supplied ack header
104 if !validAckModes10
[am
] {
111 if ok
{ // Client supplied ack header
112 if !(validAckModes10
[am
] || validAckModes1x
[am
]) {
117 log
.Fatalf("Internal protocol level error:<%s>\n", c
.Protocol())
125 func (c
*Connection
) establishSubscription(h Headers
) (*subscription
, error
, Headers
) {
126 c
.log(SUBSCRIBE
, "start establishSubscription")
127 defer c
.log(SUBSCRIBE
, "end establishSubscription")
129 id
, hid
:= h
.Contains(HK_ID
)
131 sha11
:= Sha1(h
.Value(HK_DESTINATION
))
133 c
.subsLock
.RLock() // Acquire Read lock
136 if _
, q
:= c
.subs
[id
]; q
{
137 c
.subsLock
.RUnlock() // Release Read lock
138 return nil, EDUPSID
, h
// Duplicate subscriptions not allowed
140 if _
, q
:= c
.subs
[sha11
]; q
{
141 c
.subsLock
.RUnlock() // Release Read lock
142 return nil, EDUPSID
, h
// Duplicate subscriptions not allowed
145 if _
, q
:= c
.subs
[uuid1
]; q
{
146 c
.subsLock
.RUnlock() // Release Read lock
147 return nil, EDUPSID
, h
// Duplicate subscriptions not allowed
150 c
.subsLock
.RUnlock() // Release Read lock
152 sd
:= new(subscription
) // New subscription data
154 sd
.id
= id
// Note user supplied id
156 sd
.cs
= false // No shutdown yet
157 sd
.drav
= false // Drain after value validity
158 sd
.dra
= 0 // Never drain MESSAGE frames
159 sd
.drmc
= 0 // Current drain count
160 sd
.md
= make(chan MessageData
, c
.scc
) // Make subscription MD channel
161 sd
.am
= h
.Value(HK_ACK
) // Set subscription ack mode
164 // No caller supplied ID. This STOMP client package supplies one. It is the
165 // caller's responsibility for discover the value from subsequent message
167 switch c
.Protocol() {
169 nsid
:= sha11
// This will be unique for a given destination
171 h
= h
.Add(HK_ID
, nsid
)
176 h
= h
.Add(HK_ID
, uuid1
)
178 log
.Fatalf("Internal protocol level error:<%s>\n", c
.Protocol())
182 // STOMP Protocol Enhancement
183 if dc
, okda
:= h
.Contains(StompPlusDrainAfter
); okda
{
184 n
, e
:= strconv
.ParseInt(dc
, 10, 0)
186 log
.Printf("sng_drafter conversion error: %v\n", e
)
188 sd
.drav
= true // Drain after value is OK
189 sd
.dra
= uint(n
) // Drain after count
193 // This is a write lock
195 c
.subs
[sd
.id
] = sd
// Add subscription to the connection subscription map
198 return sd
, nil, h
// Return the subscription pointer