test2
[test_vsfilter.git] / src / filters / BaseClasses / pullpin.cpp
blob14769d0503fd6dfb6c22859889a958ae9ff640f0
1 //------------------------------------------------------------------------------
2 // File: PullPin.cpp
3 //
4 // Desc: DirectShow base classes - implements CPullPin class that pulls data
5 // from IAsyncReader.
6 //
7 // Copyright (c) 1992-2002 Microsoft Corporation. All rights reserved.
8 //------------------------------------------------------------------------------
11 #include <streams.h>
12 #include "pullpin.h"
16 CPullPin::CPullPin()
17 : m_pReader(NULL),
18 m_pAlloc(NULL),
19 m_State(TM_Exit)
23 CPullPin::~CPullPin()
25 Disconnect();
28 // returns S_OK if successfully connected to an IAsyncReader interface
29 // from this object
30 // Optional allocator should be proposed as a preferred allocator if
31 // necessary
32 HRESULT
33 CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
35 CAutoLock lock(&m_AccessLock);
37 if (m_pReader) {
38 return VFW_E_ALREADY_CONNECTED;
41 HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
42 if (FAILED(hr)) {
43 return(hr);
46 hr = DecideAllocator(pAlloc, NULL);
47 if (FAILED(hr)) {
48 Disconnect();
49 return hr;
52 LONGLONG llTotal, llAvail;
53 hr = m_pReader->Length(&llTotal, &llAvail);
54 if (FAILED(hr)) {
55 Disconnect();
56 return hr;
59 // convert from file position to reference time
60 m_tDuration = llTotal * UNITS;
61 m_tStop = m_tDuration;
62 m_tStart = 0;
64 m_bSync = bSync;
66 return S_OK;
69 // disconnect any connection made in Connect
70 HRESULT
71 CPullPin::Disconnect()
73 CAutoLock lock(&m_AccessLock);
75 StopThread();
77 if (m_pReader) {
78 m_pReader->Release();
79 m_pReader = NULL;
82 if (m_pAlloc) {
83 m_pAlloc->Release();
84 m_pAlloc = NULL;
87 return S_OK;
90 // agree an allocator using RequestAllocator - optional
91 // props param specifies your requirements (non-zero fields).
92 // returns an error code if fail to match requirements.
93 // optional IMemAllocator interface is offered as a preferred allocator
94 // but no error occurs if it can't be met.
95 HRESULT
96 CPullPin::DecideAllocator(
97 IMemAllocator * pAlloc,
98 ALLOCATOR_PROPERTIES * pProps)
100 ALLOCATOR_PROPERTIES *pRequest;
101 ALLOCATOR_PROPERTIES Request;
102 if (pProps == NULL) {
103 Request.cBuffers = 3;
104 Request.cbBuffer = 64*1024;
105 Request.cbAlign = 0;
106 Request.cbPrefix = 0;
107 pRequest = &Request;
108 } else {
109 pRequest = pProps;
111 HRESULT hr = m_pReader->RequestAllocator(
112 pAlloc,
113 pRequest,
114 &m_pAlloc);
115 return hr;
118 // start pulling data
119 HRESULT
120 CPullPin::Active(void)
122 ASSERT(!ThreadExists());
123 return StartThread();
126 // stop pulling data
127 HRESULT
128 CPullPin::Inactive(void)
130 StopThread();
132 return S_OK;
135 HRESULT
136 CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
138 CAutoLock lock(&m_AccessLock);
140 ThreadMsg AtStart = m_State;
142 if (AtStart == TM_Start) {
143 BeginFlush();
144 PauseThread();
145 EndFlush();
148 m_tStart = tStart;
149 m_tStop = tStop;
151 HRESULT hr = S_OK;
152 if (AtStart == TM_Start) {
153 hr = StartThread();
156 return hr;
159 HRESULT
160 CPullPin::Duration(REFERENCE_TIME* ptDuration)
162 *ptDuration = m_tDuration;
163 return S_OK;
167 HRESULT
168 CPullPin::StartThread()
170 CAutoLock lock(&m_AccessLock);
172 if (!m_pAlloc || !m_pReader) {
173 return E_UNEXPECTED;
176 HRESULT hr;
177 if (!ThreadExists()) {
179 // commit allocator
180 hr = m_pAlloc->Commit();
181 if (FAILED(hr)) {
182 return hr;
185 // start thread
186 if (!Create()) {
187 return E_FAIL;
191 m_State = TM_Start;
192 hr = (HRESULT) CallWorker(m_State);
193 return hr;
196 HRESULT
197 CPullPin::PauseThread()
199 CAutoLock lock(&m_AccessLock);
201 if (!ThreadExists()) {
202 return E_UNEXPECTED;
205 // need to flush to ensure the thread is not blocked
206 // in WaitForNext
207 HRESULT hr = m_pReader->BeginFlush();
208 if (FAILED(hr)) {
209 return hr;
212 m_State = TM_Pause;
213 hr = CallWorker(TM_Pause);
215 m_pReader->EndFlush();
216 return hr;
219 HRESULT
220 CPullPin::StopThread()
222 CAutoLock lock(&m_AccessLock);
224 if (!ThreadExists()) {
225 return S_FALSE;
228 // need to flush to ensure the thread is not blocked
229 // in WaitForNext
230 HRESULT hr = m_pReader->BeginFlush();
231 if (FAILED(hr)) {
232 return hr;
235 m_State = TM_Exit;
236 hr = CallWorker(TM_Exit);
238 m_pReader->EndFlush();
240 // wait for thread to completely exit
241 Close();
243 // decommit allocator
244 if (m_pAlloc) {
245 m_pAlloc->Decommit();
248 return S_OK;
252 DWORD
253 CPullPin::ThreadProc(void)
255 while(1) {
256 DWORD cmd = GetRequest();
257 switch(cmd) {
258 case TM_Exit:
259 Reply(S_OK);
260 return 0;
262 case TM_Pause:
263 // we are paused already
264 Reply(S_OK);
265 break;
267 case TM_Start:
268 Reply(S_OK);
269 Process();
270 break;
273 // at this point, there should be no outstanding requests on the
274 // upstream filter.
275 // We should force begin/endflush to ensure that this is true.
276 // !!!Note that we may currently be inside a BeginFlush/EndFlush pair
277 // on another thread, but the premature EndFlush will do no harm now
278 // that we are idle.
279 m_pReader->BeginFlush();
280 CleanupCancelled();
281 m_pReader->EndFlush();
285 HRESULT
286 CPullPin::QueueSample(
287 REFERENCE_TIME& tCurrent,
288 REFERENCE_TIME tAlignStop,
289 BOOL bDiscontinuity
292 IMediaSample* pSample;
294 HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
295 if (FAILED(hr)) {
296 return hr;
299 LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
300 if (tStopThis > tAlignStop) {
301 tStopThis = tAlignStop;
303 pSample->SetTime(&tCurrent, &tStopThis);
304 tCurrent = tStopThis;
306 pSample->SetDiscontinuity(bDiscontinuity);
308 hr = m_pReader->Request(
309 pSample,
311 if (FAILED(hr)) {
312 pSample->Release();
314 CleanupCancelled();
315 OnError(hr);
317 return hr;
320 HRESULT
321 CPullPin::CollectAndDeliver(
322 REFERENCE_TIME tStart,
323 REFERENCE_TIME tStop)
325 IMediaSample* pSample = NULL; // better be sure pSample is set
326 DWORD_PTR dwUnused;
327 HRESULT hr = m_pReader->WaitForNext(
328 INFINITE,
329 &pSample,
330 &dwUnused);
331 if (FAILED(hr)) {
332 if (pSample) {
333 pSample->Release();
335 } else {
336 hr = DeliverSample(pSample, tStart, tStop);
338 if (FAILED(hr)) {
339 CleanupCancelled();
340 OnError(hr);
342 return hr;
346 HRESULT
347 CPullPin::DeliverSample(
348 IMediaSample* pSample,
349 REFERENCE_TIME tStart,
350 REFERENCE_TIME tStop
353 // fix up sample if past actual stop (for sector alignment)
354 REFERENCE_TIME t1, t2;
355 pSample->GetTime(&t1, &t2);
356 if (t2 > tStop) {
357 t2 = tStop;
360 // adjust times to be relative to (aligned) start time
361 t1 -= tStart;
362 t2 -= tStart;
363 pSample->SetTime(&t1, &t2);
366 HRESULT hr = Receive(pSample);
367 pSample->Release();
368 return hr;
371 void
372 CPullPin::Process(void)
374 // is there anything to do?
375 if (m_tStop <= m_tStart) {
376 EndOfStream();
377 return;
380 BOOL bDiscontinuity = TRUE;
382 // if there is more than one sample at the allocator,
383 // then try to queue 2 at once in order to overlap.
384 // -- get buffer count and required alignment
385 ALLOCATOR_PROPERTIES Actual;
386 HRESULT hr = m_pAlloc->GetProperties(&Actual);
388 // align the start position downwards
389 REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
390 REFERENCE_TIME tCurrent = tStart;
392 REFERENCE_TIME tStop = m_tStop;
393 if (tStop > m_tDuration) {
394 tStop = m_tDuration;
397 // align the stop position - may be past stop, but that
398 // doesn't matter
399 REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
402 DWORD dwRequest;
404 if (!m_bSync) {
406 // Break out of the loop either if we get to the end or we're asked
407 // to do something else
408 while (tCurrent < tAlignStop) {
410 // Break out without calling EndOfStream if we're asked to
411 // do something different
412 if (CheckRequest(&dwRequest)) {
413 return;
416 // queue a first sample
417 if (Actual.cBuffers > 1) {
419 hr = QueueSample(tCurrent, tAlignStop, TRUE);
420 bDiscontinuity = FALSE;
422 if (FAILED(hr)) {
423 return;
429 // loop queueing second and waiting for first..
430 while (tCurrent < tAlignStop) {
432 hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
433 bDiscontinuity = FALSE;
435 if (FAILED(hr)) {
436 return;
439 hr = CollectAndDeliver(tStart, tStop);
440 if (S_OK != hr) {
442 // stop if error, or if downstream filter said
443 // to stop.
444 return;
448 if (Actual.cBuffers > 1) {
449 hr = CollectAndDeliver(tStart, tStop);
450 if (FAILED(hr)) {
451 return;
455 } else {
457 // sync version of above loop
458 while (tCurrent < tAlignStop) {
460 // Break out without calling EndOfStream if we're asked to
461 // do something different
462 if (CheckRequest(&dwRequest)) {
463 return;
466 IMediaSample* pSample;
468 hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
469 if (FAILED(hr)) {
470 OnError(hr);
471 return;
474 LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
475 if (tStopThis > tAlignStop) {
476 tStopThis = tAlignStop;
478 pSample->SetTime(&tCurrent, &tStopThis);
479 tCurrent = tStopThis;
481 if (bDiscontinuity) {
482 pSample->SetDiscontinuity(TRUE);
483 bDiscontinuity = FALSE;
486 hr = m_pReader->SyncReadAligned(pSample);
488 if (FAILED(hr)) {
489 pSample->Release();
490 OnError(hr);
491 return;
494 hr = DeliverSample(pSample, tStart, tStop);
495 if (hr != S_OK) {
496 if (FAILED(hr)) {
497 OnError(hr);
499 return;
504 EndOfStream();
507 // after a flush, cancelled i/o will be waiting for collection
508 // and release
509 void
510 CPullPin::CleanupCancelled(void)
512 while (1) {
513 IMediaSample * pSample;
514 DWORD_PTR dwUnused;
516 HRESULT hr = m_pReader->WaitForNext(
517 0, // no wait
518 &pSample,
519 &dwUnused);
520 if(pSample) {
521 pSample->Release();
522 } else {
523 // no more samples
524 return;