2 * Copyright 2001-2005, Haiku.
3 * Distributed under the terms of the MIT License.
6 * Pahtz <pahtz@yahoo.com.au>
7 * Axel Dörfler, axeld@pinc-software.de
10 /** Class for low-overhead port-based messaging */
17 #include <ServerProtocol.h>
18 #include <LinkSender.h>
20 #include "link_message.h"
23 //#define DEBUG_BPORTLINK
24 #ifdef DEBUG_BPORTLINK
26 # define STRACE(x) printf x
31 static const size_t kMaxStringSize
= 4096;
32 static const size_t kWatermark
= kInitialBufferSize
- 24;
33 // if a message is started after this mark, the buffer is flushed automatically
37 LinkSender::LinkSender(port_id port
)
51 LinkSender::~LinkSender()
58 LinkSender::SetPort(port_id port
)
65 LinkSender::StartMessage(int32 code
, size_t minSize
)
67 // end previous message
68 if (EndMessage() < B_OK
)
71 if (minSize
> kMaxBufferSize
- sizeof(message_header
)) {
72 // we will handle this case in Attach, using an area
73 minSize
= sizeof(area_id
);
76 minSize
+= sizeof(message_header
);
78 // Eventually flush buffer to make space for the new message.
79 // Note, we do not take the actual buffer size into account to not
80 // delay the time between buffer flushes too much.
81 if (fBufferSize
> 0 && (minSize
> SpaceLeft() || fCurrentStart
>= kWatermark
)) {
82 status_t status
= Flush();
87 if (minSize
> fBufferSize
) {
88 if (AdjustBuffer(minSize
) != B_OK
)
89 return fCurrentStatus
= B_NO_MEMORY
;
92 message_header
*header
= (message_header
*)(fBuffer
+ fCurrentStart
);
98 STRACE(("info: LinkSender buffered header %ld (%lx) [%lu %lu %lu].\n",
99 code
, code
, header
->size
, header
->code
, header
->flags
));
101 fCurrentEnd
+= sizeof(message_header
);
107 LinkSender::EndMessage(bool needsReply
)
109 if (fCurrentEnd
== fCurrentStart
|| fCurrentStatus
< B_OK
)
110 return fCurrentStatus
;
112 // record the size of the message
113 message_header
*header
= (message_header
*)(fBuffer
+ fCurrentStart
);
114 header
->size
= CurrentMessageSize();
116 header
->flags
|= needsReply
;
118 STRACE(("info: LinkSender EndMessage() of size %ld.\n", header
->size
));
120 // bump to start of next message
121 fCurrentStart
= fCurrentEnd
;
127 LinkSender::CancelMessage()
129 fCurrentEnd
= fCurrentStart
;
130 fCurrentStatus
= B_OK
;
135 LinkSender::Attach(const void *passedData
, size_t passedSize
)
137 size_t size
= passedSize
;
138 const void* data
= passedData
;
140 if (fCurrentStatus
< B_OK
)
141 return fCurrentStatus
;
144 return fCurrentStatus
= B_BAD_VALUE
;
146 if (fCurrentEnd
== fCurrentStart
)
147 return B_NO_INIT
; // need to call StartMessage() first
149 bool useArea
= false;
150 if (size
>= kMaxBufferSize
) {
152 size
= sizeof(area_id
);
155 if (SpaceLeft() < size
) {
156 // we have to make space for the data
158 status_t status
= FlushCompleted(size
+ CurrentMessageSize());
160 return fCurrentStatus
= status
;
163 area_id senderArea
= -1;
165 if (fTargetTeam
< 0) {
167 status_t result
= get_port_info(fPort
, &info
);
170 fTargetTeam
= info
.team
;
172 void* address
= NULL
;
173 off_t alignedSize
= (passedSize
+ B_PAGE_SIZE
) & ~(B_PAGE_SIZE
- 1);
174 senderArea
= create_area("LinkSenderArea", &address
, B_ANY_ADDRESS
,
175 alignedSize
, B_NO_LOCK
, B_READ_AREA
| B_WRITE_AREA
);
177 if (senderArea
< B_OK
)
181 memcpy(address
, passedData
, passedSize
);
183 area_id areaID
= senderArea
;
184 senderArea
= _kern_transfer_area(senderArea
, &address
,
185 B_ANY_ADDRESS
, fTargetTeam
);
187 if (senderArea
< B_OK
) {
193 memcpy(fBuffer
+ fCurrentEnd
, data
, size
);
201 LinkSender::AttachString(const char *string
, int32 length
)
206 size_t maxLength
= strlen(string
);
208 length
= (int32
)maxLength
;
210 // we should report an error here
211 if (maxLength
> kMaxStringSize
)
213 } else if (length
> (int32
)maxLength
)
216 status_t status
= Attach
<int32
>(length
);
221 status
= Attach(string
, length
);
223 fCurrentEnd
-= sizeof(int32
); // rewind the transaction
231 LinkSender::AdjustBuffer(size_t newSize
, char **_oldBuffer
)
233 // make sure the new size is within bounds
234 if (newSize
<= kInitialBufferSize
)
235 newSize
= kInitialBufferSize
;
236 else if (newSize
> kMaxBufferSize
)
237 return B_BUFFER_OVERFLOW
;
238 else if (newSize
> kInitialBufferSize
)
239 newSize
= (newSize
+ B_PAGE_SIZE
- 1) & ~(B_PAGE_SIZE
- 1);
241 if (newSize
== fBufferSize
) {
242 // keep existing buffer
244 *_oldBuffer
= fBuffer
;
248 // create new larger buffer
249 char *buffer
= (char *)malloc(newSize
);
254 *_oldBuffer
= fBuffer
;
259 fBufferSize
= newSize
;
265 LinkSender::FlushCompleted(size_t newBufferSize
)
267 // we need to hide the incomplete message so that it's not flushed
268 int32 end
= fCurrentEnd
;
269 int32 start
= fCurrentStart
;
270 fCurrentEnd
= fCurrentStart
;
272 status_t status
= Flush();
278 char *oldBuffer
= NULL
;
279 status
= AdjustBuffer(newBufferSize
, &oldBuffer
);
283 // move the incomplete message to the start of the buffer
284 fCurrentEnd
= end
- start
;
285 if (oldBuffer
!= fBuffer
) {
286 memcpy(fBuffer
, oldBuffer
+ start
, fCurrentEnd
);
289 memmove(fBuffer
, fBuffer
+ start
, fCurrentEnd
);
296 LinkSender::Flush(bigtime_t timeout
, bool needsReply
)
298 if (fCurrentStatus
< B_OK
)
299 return fCurrentStatus
;
301 EndMessage(needsReply
);
302 if (fCurrentStart
== 0)
305 STRACE(("info: LinkSender Flush() waiting to send messages of %ld bytes on port %ld.\n",
306 fCurrentEnd
, fPort
));
309 if (timeout
!= B_INFINITE_TIMEOUT
) {
311 err
= write_port_etc(fPort
, kLinkCode
, fBuffer
,
312 fCurrentEnd
, B_RELATIVE_TIMEOUT
, timeout
);
313 } while (err
== B_INTERRUPTED
);
316 err
= write_port(fPort
, kLinkCode
, fBuffer
, fCurrentEnd
);
317 } while (err
== B_INTERRUPTED
);
321 STRACE(("error info: LinkSender Flush() failed for %ld bytes (%s) on port %ld.\n",
322 fCurrentEnd
, strerror(err
), fPort
));
326 STRACE(("info: LinkSender Flush() messages total of %ld bytes on port %ld.\n",
327 fCurrentEnd
, fPort
));
335 } // namespace BPrivate