3 #include <new> // so I can throw std::bad_alloc
5 #include "ringbuffer.h"
7 class AutoCriticalRegion
9 MPCriticalRegionID m_mutex
;
11 AutoCriticalRegion(MPCriticalRegionID mutex
)
14 MPEnterCriticalRegion(m_mutex
, kDurationForever
);
18 MPExitCriticalRegion(m_mutex
);
22 JRingBuffer::JRingBuffer(int bs
)
23 : m_in(0), m_out(-1), m_buffersize(bs
), m_shutdown(0), m_reserved(0),
24 m_mutex(0), m_readtoken(0)
26 if ((m_buffer
= new unsigned char[m_buffersize
]) == 0) {
27 throw std::bad_alloc();
30 m_bufferend
= m_buffer
+ m_buffersize
;
31 if (MPCreateCriticalRegion(&m_mutex
) == noErr
) {
32 if (MPCreateBinarySemaphore(&m_readtoken
) == noErr
) {
35 MPDeleteCriticalRegion(m_mutex
);
38 // fall through here on failures
41 throw std::bad_alloc();
44 // make sure no one is using it when it is destructed!
45 JRingBuffer::~JRingBuffer() {
50 if (m_mutex
!= kInvalidID
)
51 MPDeleteCriticalRegion(m_mutex
);
52 if (m_readtoken
!= kInvalidID
)
53 MPDeleteSemaphore(m_readtoken
);
56 // returns 0 on success, -1 on failure
57 int JRingBuffer::Write(void *buf
, int len
)
59 int wasempty
= 0, overfill
= 0;
60 if (len
== 0) return 0;
61 // It is a programming error to stuff more than buffersize in at once.
62 assert(len
<= m_buffersize
);
64 AutoCriticalRegion
mutex(m_mutex
);
66 if (len
>= m_buffersize
) {
68 // a quickie optimization: if we are filling the buffer, just start
74 overfill
= (CanHold() < len
);
76 // OK, are we going to wrap?
77 if ((m_buffersize
- m_in
) <= len
) {
78 // yes, copy part and wrap
79 int part
= m_buffersize
- m_in
;
80 memmove(m_buffer
+ m_in
, buf
, part
);
82 buf
= (void *)((char *)buf
+ part
);
85 // not wrapping (anymore)
87 memmove(m_buffer
+ m_in
, buf
, len
);
91 // if empty, signal the CV, we just dumped in some data
94 MPSignalSemaphore(m_readtoken
);
99 // Get a pointer to a reserved buffer
100 void *JRingBuffer::ReserveToWrite(int &len
)
102 if (len
<= 0) return NULL
;
103 // It is a programming error to stuff more than buffersize in at once.
104 assert(len
<= m_buffersize
);
105 if (len
> m_buffersize
) len
= m_buffersize
;
107 MPEnterCriticalRegion(m_mutex
, kDurationForever
);
109 return ReserveToWrite_locked(len
);
112 // Call with the mutex LOCKED.
113 void *JRingBuffer::ReserveToWrite_locked(int &len
)
115 // OK, are we going to wrap?
116 if ((m_buffersize
- m_in
) <= len
) {
117 len
= m_buffersize
- m_in
;
120 return &m_buffer
[m_in
];
121 // MUTEX IS STILL LOCKED.
124 int JRingBuffer::CommitFinal(int wrotelen
)
126 int wasempty
= IsEmpty(), overfill
= CanHold() < wrotelen
;
128 assert(wrotelen
>= 0);
129 assert(m_reserved
>= wrotelen
);
132 if (m_in
== m_buffersize
) m_in
= 0;
137 MPSignalSemaphore(m_readtoken
);
140 MPExitCriticalRegion(m_mutex
);
144 void *JRingBuffer::CommitMore(int wrotelen
, int &morelen
)
146 int wasempty
= IsEmpty(), overfill
= CanHold() < wrotelen
;
148 assert(wrotelen
>= 0);
149 assert(m_reserved
>= wrotelen
);
152 if (m_in
== m_buffersize
) m_in
= 0;
157 MPSignalSemaphore(m_readtoken
);
158 // Interesting question: should we drop the mutex and relock?
159 // I suspect not, since the writer is probably a time-critical
163 return ReserveToWrite_locked(morelen
);
166 int JRingBuffer::ReadTimed(void *buf
, int len
, long timeout
, int flag
)
170 if (flag
== kAsync
) timeout
= 0;
172 expiry
= AddDurationToAbsolute((Duration
)timeout
, UpTime());
175 #ifndef NDEBUG // this is used only in an assert
180 // Obtain the read token
181 // First, recalculate the timeout (we may have waited before)
182 if (timeout
!= aLongTime
) {
183 timeout
= AbsoluteToDuration(
184 SubAbsoluteFromAbsolute(expiry
,
192 oops
= MPWaitOnSemaphore(m_readtoken
, (Duration
)timeout
);
203 AutoCriticalRegion
mutex(m_mutex
);
205 // While there is data and we need some...
206 while (!IsShutdown() && !IsEmpty() && len
> 0) {
207 // copy out a contiguous chunk.
208 int avail
= Contains();
209 // only read what's needed
210 if (avail
> len
) avail
= len
;
211 // now, are we wrapping?
212 if (avail
>= m_buffersize
- m_out
)
213 avail
= m_buffersize
- m_out
;
214 memmove(buf
, m_buffer
+ m_out
, avail
);
215 buf
= (void *)((char *)buf
+ avail
);
220 if (m_out
== m_buffersize
)
223 m_out
= -1; // we have emptied the buffer
227 if (!IsEmpty() || IsShutdown()) {
228 // regenerate the read token
229 MPSignalSemaphore(m_readtoken
);
235 // we have read as much as we can or need. Are we done?
236 if (flag
== kAsync
|| len
== 0 || (flag
== kSome
&& total_read
!= 0))
245 assert(total_read
< 0 || flag
!= kWait
|| total_read
== origlen
);
250 bool JRingBuffer::WaitForData(long timeout
)
253 err
= MPWaitOnSemaphore(m_readtoken
, timeout
);
254 if (err
== 0) MPSignalSemaphore(m_readtoken
);
258 void JRingBuffer::MakeEmpty()
260 MPEnterCriticalRegion(m_mutex
, kDurationForever
);
263 MPExitCriticalRegion(m_mutex
);
266 void JRingBuffer::Shutdown()
268 MPEnterCriticalRegion(m_mutex
, kDurationForever
);
272 MPExitCriticalRegion(m_mutex
);
273 (void)MPSignalSemaphore(m_readtoken
);