1 /******************************************************************************
5 Copyright 1995-97, Be Incorporated
7 ******************************************************************************/
8 #ifndef _BUFFER_STREAM_H
9 #define _BUFFER_STREAM_H
13 #include <SupportDefs.h>
15 #include <Messenger.h>
22 Per-subscriber information.
27 typedef struct _sub_info
{
28 _sub_info
*fNext
; /* next subscriber in the stream*/
29 _sub_info
*fPrev
; /* previous subscriber in the stream */
30 _sbuf_info
*fRel
; /* next buf to be released */
31 _sbuf_info
*fAcq
; /* next buf to be acquired */
32 sem_id fSem
; /* semaphore used for blocking */
33 bigtime_t fTotalTime
; /* accumulated time between acq/rel */
34 int32 fHeld
; /* # of buffers acq'd but not yet rel'd */
35 sem_id fBlockedOn
; /* the semaphore being waited on */
36 /* or B_BAD_SEM_ID if not blocked */
41 Per-buffer information
44 typedef struct _sbuf_info
{
45 _sbuf_info
*fNext
; /* next "newer" buffer in the chain */
46 subscriber_id fAvailTo
; /* next subscriber to acquire this buffer */
47 subscriber_id fHeldBy
; /* subscriber that's acquired this buffer */
48 bigtime_t fAcqTime
; /* time at which this buffer was acquired */
49 area_id fAreaID
; /* for system memory allocation calls */
51 int32 fSize
; /* usable portion can be smaller than ... */
52 int32 fAreaSize
; /* ... the size of the area. */
53 bool fIsFinal
; /* TRUE => stream is stopping */
58 Interface definition for BBufferStream class
61 /* We've chosen B_MAX_SUBSCRIBER_COUNT and B_MAX_BUFFER_COUNT to be small
62 * enough so that a BBufferStream structure fits in one 4096 byte page.
64 #define B_MAX_SUBSCRIBER_COUNT 52
65 #define B_MAX_BUFFER_COUNT 32
68 class BBufferStreamManager
;
70 typedef BBufferStream
* stream_id
; // for now
73 class BAbstractBufferStream
77 virtual ~BAbstractBufferStream();
80 virtual status_t
GetStreamParameters(size_t *bufferSize
,
83 int32
*subscriberCount
) const;
85 virtual status_t
SetStreamBuffers(size_t bufferSize
,
88 virtual status_t
StartStreaming();
89 virtual status_t
StopStreaming();
93 virtual void _ReservedAbstractBufferStream1();
94 virtual void _ReservedAbstractBufferStream2();
95 virtual void _ReservedAbstractBufferStream3();
96 virtual void _ReservedAbstractBufferStream4();
98 friend class BSubscriber
;
99 friend class BBufferStreamManager
;
101 virtual stream_id
StreamID() const;
102 /* stream identifier for direct access */
104 /* Create or delete a subscriber id for subsequent operations */
105 virtual status_t
Subscribe(char *name
,
106 subscriber_id
*subID
,
108 virtual status_t
Unsubscribe(subscriber_id subID
);
110 /* Enter into or quit the stream */
111 virtual status_t
EnterStream(subscriber_id subID
,
112 subscriber_id neighbor
,
115 virtual status_t
ExitStream(subscriber_id subID
);
117 virtual BMessenger
* Server() const; /* message pipe to server */
118 status_t
SendRPC(BMessage
* msg
, BMessage
* reply
= NULL
) const;
122 class BBufferStream
: public BAbstractBufferStream
126 BBufferStream(size_t headerSize
,
127 BBufferStreamManager
* controller
,
128 BSubscriber
* headFeeder
,
129 BSubscriber
* tailFeeder
);
130 virtual ~BBufferStream();
132 /* BBufferStreams are allocated on shared memory pages */
133 void *operator new(size_t size
);
134 void operator delete(void *stream
, size_t size
);
136 /* Return header size */
137 size_t HeaderSize() const;
139 /* These four functions are delegated to the stream controller */
140 status_t
GetStreamParameters(size_t *bufferSize
,
143 int32
*subscriberCount
) const;
145 status_t
SetStreamBuffers(size_t bufferSize
,
148 status_t
StartStreaming();
149 status_t
StopStreaming();
151 /* Get the controller for delegation */
152 BBufferStreamManager
*StreamManager() const;
154 /* number of buffers in stream */
155 int32
CountBuffers() const;
157 /* Create or delete a subscriber id for subsequent operations */
158 status_t
Subscribe(char *name
,
159 subscriber_id
*subID
,
162 status_t
Unsubscribe(subscriber_id subID
);
164 /* Enter into or quit the stream */
165 status_t
EnterStream(subscriber_id subID
,
166 subscriber_id neighbor
,
169 status_t
ExitStream(subscriber_id subID
);
171 /* queries about a subscriber */
172 bool IsSubscribed(subscriber_id subID
);
173 bool IsEntered(subscriber_id subID
);
175 status_t
SubscriberInfo(subscriber_id subID
,
180 /* Force an error return of a subscriber if it's blocked */
181 status_t
UnblockSubscriber(subscriber_id subID
);
183 /* Acquire and release a buffer */
184 status_t
AcquireBuffer(subscriber_id subID
,
187 status_t
ReleaseBuffer(subscriber_id subID
);
189 /* Get the attributes of a particular buffer */
190 size_t BufferSize(buffer_id bufID
) const;
191 char *BufferData(buffer_id bufID
) const;
192 bool IsFinalBuffer(buffer_id bufID
) const;
194 /* Get attributes of a particular subscriber */
195 int32
CountBuffersHeld(subscriber_id subID
);
197 /* Queries for the BBufferStream */
198 int32
CountSubscribers() const;
199 int32
CountEnteredSubscribers() const;
201 subscriber_id
FirstSubscriber() const;
202 subscriber_id
LastSubscriber() const;
203 subscriber_id
NextSubscriber(subscriber_id subID
);
204 subscriber_id
PrevSubscriber(subscriber_id subID
);
209 void PrintSubscribers();
211 /* gaining exclusive access to the BBufferStream */
215 /* introduce a new buffer into the "newest" end of the chain */
216 status_t
AddBuffer(buffer_id bufID
);
218 /* remove a buffer from the "oldest" end of the chain */
219 buffer_id
RemoveBuffer(bool force
);
221 /* allocate a buffer from shared memory and create a bufID for it. */
222 buffer_id
CreateBuffer(size_t size
, bool isFinal
);
224 /* deallocate a buffer and returns its bufID to the freelist */
225 void DestroyBuffer(buffer_id bufID
);
227 /* remove and destroy any "newest" buffers from the head of the chain
228 * that have not yet been claimed by any subscribers. If there are
229 * no subscribers, this clears the entire chain.
231 void RescindBuffers();
234 Private member functions that assume locking already has been done.
239 virtual void _ReservedBufferStream1();
240 virtual void _ReservedBufferStream2();
241 virtual void _ReservedBufferStream3();
242 virtual void _ReservedBufferStream4();
244 /* initialize the free list of subscribers */
245 void InitSubscribers();
247 /* return TRUE if subID appears valid */
248 bool IsSubscribedSafe(subscriber_id subID
) const;
250 /* return TRUE if subID is entered into the stream */
251 bool IsEnteredSafe(subscriber_id subID
) const;
253 /* initialize the free list of buffer IDs */
256 /* Wake a blocked subscriber */
257 status_t
WakeSubscriber(subscriber_id subID
);
259 /* Give subID all the buffers it can get */
260 void InheritBuffers(subscriber_id subID
);
262 /* Relinquish any buffers held by subID */
263 void BequeathBuffers(subscriber_id subID
);
265 /* Fast version of ReleaseBuffer() */
266 status_t
ReleaseBufferSafe(subscriber_id subID
);
268 /* Release a buffer to a subscriber */
269 status_t
ReleaseBufferTo(buffer_id bufID
, subscriber_id subID
);
271 /* deallocate all buffers */
272 void FreeAllBuffers();
274 /* deallocate all subscribers */
275 void FreeAllSubscribers();
282 area_id fAreaID
; /* area id for this BBufferStream */
283 BBufferStreamManager
*fStreamManager
;
284 BSubscriber
*fHeadFeeder
;
285 BSubscriber
*fTailFeeder
;
292 _sub_info
*fFreeSubs
; /* free list of subscribers */
293 _sub_info
*fFirstSub
; /* first entered in itinierary */
294 _sub_info
*fLastSub
; /* last entered in itinerary */
296 sem_id fFirstSem
; /* semaphore used by fFirstSub */
298 int32 fEnteredSubCount
;
300 _sub_info fSubscribers
[B_MAX_SUBSCRIBER_COUNT
];
306 _sbuf_info
*fFreeBuffers
;
307 _sbuf_info
*fOldestBuffer
; /* first in line */
308 _sbuf_info
*fNewestBuffer
; /* fNewest->fNext = NULL */
311 _sbuf_info fBuffers
[B_MAX_BUFFER_COUNT
];
316 #endif // #ifdef _BUFFER_STREAM_H