asyncio.cpp

00001 //------------------------------------------------------------------------------
00002 // File: AsyncIo.cpp
00003 //
00004 // Desc: DirectShow sample code - base library with I/O functionality.
00005 //
00006 // Copyright (c) 1992-2001 Microsoft Corporation.  All rights reserved.
00007 //------------------------------------------------------------------------------
00008 
00009 
00010 #include "stdafx.h"
00011 #include "asyncio.h"
00012 
00013 // --- CAsyncRequest ---
00014 
00015 
00016 // implementation of CAsyncRequest representing a single
00017 // outstanding request. All the i/o for this object is done
00018 // in the Complete method.
00019 
00020 
00021 // init the params for this request.
00022 // Read is not issued until the complete call
00023 HRESULT
00024 CAsyncRequest::Request(
00025     CAsyncIo *pIo,
00026     CAsyncStream *pStream,
00027     LONGLONG llPos,
00028     LONG lLength,
00029     BOOL bAligned,
00030     BYTE* pBuffer,
00031     LPVOID pContext,    // filter's context
00032     DWORD dwUser)       // downstream filter's context
00033 {
00034     m_pIo = pIo;
00035     m_pStream = pStream;
00036     m_llPos = llPos;
00037     m_lLength = lLength;
00038     m_bAligned = bAligned;
00039     m_pBuffer = pBuffer;
00040     m_pContext = pContext;
00041     m_dwUser = dwUser;
00042     m_hr = VFW_E_TIMEOUT;   // not done yet
00043 
00044     return S_OK;
00045 }
00046 
00047 
00048 // issue the i/o if not overlapped, and block until i/o complete.
00049 // returns error code of file i/o
00050 //
00051 //
00052 HRESULT
00053 CAsyncRequest::Complete()
00054 {
00055     m_pStream->Lock();
00056 
00057     m_hr = m_pStream->SetPointer(m_llPos);
00058     if (S_OK == m_hr) {
00059 
00060         DWORD dwActual;
00061 
00062         m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual);
00063         if (m_hr == OLE_S_FIRST) {
00064             if (m_pContext) {
00065                 IMediaSample *pSample = reinterpret_cast<IMediaSample *>(m_pContext);
00066                 pSample->SetDiscontinuity(TRUE);
00067                 m_hr = S_OK;
00068             }
00069         }
00070 
00071         if (FAILED(m_hr)) {
00072         } else if (dwActual != (DWORD)m_lLength) {
00073             // tell caller size changed - probably because of EOF
00074             m_lLength = (LONG) dwActual;
00075             m_hr = S_FALSE;
00076         } else {
00077             m_hr = S_OK;
00078         }
00079     }
00080 
00081     m_pStream->Unlock();
00082     return m_hr;
00083 }
00084 
00085 
00086 
00087 // --- CAsyncIo ---
00088 
00089 // note - all events created manual reset
00090 
00091 CAsyncIo::CAsyncIo(CAsyncStream *pStream)
00092  : m_hThread(NULL),
00093    m_evWork(TRUE),
00094    m_evDone(TRUE),
00095    m_evStop(TRUE),
00096    m_listWork(NAME("Work list")),
00097    m_listDone(NAME("Done list")),
00098    m_bFlushing(FALSE),
00099    m_cItemsOut(0),
00100    m_bWaiting(FALSE),
00101    m_pStream(pStream)
00102 {
00103 
00104 }
00105 
00106 
00107 CAsyncIo::~CAsyncIo()
00108 {
00109     // move everything to the done list
00110     BeginFlush();
00111 
00112     // shutdown worker thread
00113     CloseThread();
00114 
00115     // empty the done list
00116     POSITION pos = m_listDone.GetHeadPosition();
00117     while (pos) {
00118         CAsyncRequest* pRequest = m_listDone.GetNext(pos);
00119         delete pRequest;
00120     }
00121     m_listDone.RemoveAll();
00122 }
00123 
00124 // ready for async activity - call this before calling Request.
00125 //
00126 // start the worker thread if we need to
00127 //
00128 // !!! use overlapped i/o if possible
00129 HRESULT
00130 CAsyncIo::AsyncActive(void)
00131 {
00132     return StartThread();
00133 }
00134 
00135 // call this when no more async activity will happen before
00136 // the next AsyncActive call
00137 //
00138 // stop the worker thread if active
00139 HRESULT
00140 CAsyncIo::AsyncInactive(void)
00141 {
00142     return CloseThread();
00143 }
00144 
00145 
00146 // add a request to the queue.
00147 HRESULT
00148 CAsyncIo::Request(
00149             LONGLONG llPos,
00150             LONG lLength,
00151             BOOL bAligned,
00152             BYTE* pBuffer,
00153             LPVOID pContext,
00154             DWORD dwUser)
00155 {
00156     if (bAligned) {
00157         if (!IsAligned(llPos) ||
00158         !IsAligned(lLength) ||
00159         !IsAligned((LONG) pBuffer)) {
00160             return VFW_E_BADALIGN;
00161         }
00162     }
00163 
00164     CAsyncRequest* pRequest = new CAsyncRequest;
00165 
00166     HRESULT hr = pRequest->Request(
00167                             this,
00168                             m_pStream,
00169                             llPos,
00170                             lLength,
00171                             bAligned,
00172                             pBuffer,
00173                             pContext,
00174                             dwUser);
00175     if (SUCCEEDED(hr)) {
00176         // might fail if flushing
00177         hr = PutWorkItem(pRequest);
00178     }
00179 
00180     if (FAILED(hr)) {
00181         delete pRequest;
00182     }
00183     return hr;
00184 }
00185 
00186 
00187 // wait for the next request to complete
00188 HRESULT
00189 CAsyncIo::WaitForNext(
00190     DWORD dwTimeout,
00191     LPVOID *ppContext,
00192     DWORD * pdwUser,
00193     LONG* pcbActual)
00194 {
00195     // some errors find a sample, others don't. Ensure that
00196     // *ppContext is NULL if no sample found
00197     *ppContext = NULL;
00198 
00199     // wait until the event is set, but since we are not
00200     // holding the critsec when waiting, we may need to re-wait
00201     for (;;) {
00202 
00203         if (!m_evDone.Wait(dwTimeout)) {
00204             // timeout occurred
00205             return VFW_E_TIMEOUT;
00206         }
00207 
00208         // get next event from list
00209         CAsyncRequest* pRequest = GetDoneItem();
00210         if (pRequest) {
00211             // found a completed request
00212 
00213             // check if ok
00214             HRESULT hr = pRequest->GetHResult();
00215             if (hr == S_FALSE) {
00216 
00217                 // this means the actual length was less than
00218                 // requested - may be ok if he aligned the end of file
00219                 if ((pRequest->GetActualLength() +
00220                      pRequest->GetStart()) == Size()) {
00221                     hr = S_OK;
00222                 } else {
00223                     // it was an actual read error
00224                     hr = E_FAIL;
00225                 }
00226             }
00227 
00228             // return actual bytes read
00229             *pcbActual = pRequest->GetActualLength();
00230 
00231             // return his context
00232             *ppContext = pRequest->GetContext();
00233             *pdwUser = pRequest->GetUser();
00234 
00235             delete pRequest;
00236             return hr;
00237         } else {
00238             //  Hold the critical section while checking the list state
00239             CAutoLock lck(&m_csLists);
00240             if (m_bFlushing && !m_bWaiting) {
00241 
00242                 // can't block as we are between BeginFlush and EndFlush
00243 
00244                 // but note that if m_bWaiting is set, then there are some
00245                 // items not yet complete that we should block for.
00246 
00247                 return VFW_E_WRONG_STATE;
00248             }
00249         }
00250 
00251         // done item was grabbed between completion and
00252         // us locking m_csLists.
00253     }
00254 }
00255 
00256 // perform a synchronous read request on this thread.
00257 // Need to hold m_csFile while doing this (done in request object)
00258 HRESULT
00259 CAsyncIo::SyncReadAligned(
00260             LONGLONG llPos,
00261             LONG lLength,
00262             BYTE* pBuffer,
00263             LONG* pcbActual,
00264             PVOID pvContext
00265             )
00266 {
00267     if (!IsAligned(llPos) ||
00268         !IsAligned(lLength) ||
00269         !IsAligned((LONG) pBuffer)) {
00270         return VFW_E_BADALIGN;
00271     }
00272 
00273     CAsyncRequest request;
00274 
00275     HRESULT hr = request.Request(
00276                     this,
00277                     m_pStream,
00278                     llPos,
00279                     lLength,
00280                     TRUE,
00281                     pBuffer,
00282                     pvContext,
00283                     0);
00284 
00285     if (FAILED(hr)) {
00286         return hr;
00287     }
00288 
00289     hr = request.Complete();
00290 
00291     // return actual data length
00292     *pcbActual = request.GetActualLength();
00293     return hr;
00294 }
00295 
00296 HRESULT
00297 CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG* pllAvailable)
00298 {
00299     *pllTotal = m_pStream->Size(pllAvailable);
00300     return S_OK;
00301 }
00302 
00303 // cancel all items on the worklist onto the done list
00304 // and refuse further requests or further WaitForNext calls
00305 // until the end flush
00306 //
00307 // WaitForNext must return with NULL only if there are no successful requests.
00308 // So Flush does the following:
00309 // 1. set m_bFlushing ensures no more requests succeed
00310 // 2. move all items from work list to the done list.
00311 // 3. If there are any outstanding requests, then we need to release the
00312 //    critsec to allow them to complete. The m_bWaiting as well as ensuring
00313 //    that we are signalled when they are all done is also used to indicate
00314 //    to WaitForNext that it should continue to block.
00315 // 4. Once all outstanding requests are complete, we force m_evDone set and
00316 //    m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will
00317 //    not block when the done list is empty.
00318 HRESULT
00319 CAsyncIo::BeginFlush()
00320 {
00321     // hold the lock while emptying the work list
00322     {
00323         CAutoLock lock(&m_csLists);
00324 
00325         // prevent further requests being queued.
00326         // Also WaitForNext will refuse to block if this is set
00327         // unless m_bWaiting is also set which it will be when we release
00328         // the critsec if there are any outstanding).
00329         m_bFlushing = TRUE;
00330 
00331         CAsyncRequest * preq;
00332         while((preq = GetWorkItem()) != 0) {
00333             preq->Cancel();
00334             PutDoneItem(preq);
00335         }
00336 
00337         // now wait for any outstanding requests to complete
00338         if (m_cItemsOut > 0) {
00339 
00340             // can be only one person waiting
00341             ASSERT(!m_bWaiting);
00342 
00343             // this tells the completion routine that we need to be
00344             // signalled via m_evAllDone when all outstanding items are
00345             // done. It also tells WaitForNext to continue blocking.
00346             m_bWaiting = TRUE;
00347         } else {
00348             // all done
00349 
00350             // force m_evDone set so that even if list is empty,
00351             // WaitForNext will not block
00352             // don't do this until we are sure that all
00353             // requests are on the done list.
00354             m_evDone.Set();
00355             return S_OK;
00356         }
00357     }
00358 
00359     ASSERT(m_bWaiting);
00360 
00361     // wait without holding critsec
00362     for (;;) {
00363         m_evAllDone.Wait();
00364         {
00365             // hold critsec to check
00366             CAutoLock lock(&m_csLists);
00367 
00368             if (m_cItemsOut == 0) {
00369 
00370                 // now we are sure that all outstanding requests are on
00371                 // the done list and no more will be accepted
00372                 m_bWaiting = FALSE;
00373 
00374                 // force m_evDone set so that even if list is empty,
00375                 // WaitForNext will not block
00376                 // don't do this until we are sure that all
00377                 // requests are on the done list.
00378                 m_evDone.Set();
00379 
00380                 return S_OK;
00381             }
00382         }
00383     }
00384 }
00385 
00386 // end a flushing state
00387 HRESULT
00388 CAsyncIo::EndFlush()
00389 {
00390     CAutoLock lock(&m_csLists);
00391 
00392     m_bFlushing = FALSE;
00393 
00394     ASSERT(!m_bWaiting);
00395 
00396     // m_evDone might have been set by BeginFlush - ensure it is
00397     // set IFF m_listDone is non-empty
00398     if (m_listDone.GetCount() > 0) {
00399         m_evDone.Set();
00400     } else {
00401         m_evDone.Reset();
00402     }
00403 
00404     return S_OK;
00405 }
00406 
00407 // start the thread
00408 HRESULT
00409 CAsyncIo::StartThread(void)
00410 {
00411     if (m_hThread) {
00412         return S_OK;
00413     }
00414 
00415     // clear the stop event before starting
00416     m_evStop.Reset();
00417 
00418     DWORD dwThreadID;
00419     m_hThread = CreateThread(
00420                     NULL,
00421                     0,
00422                     InitialThreadProc,
00423                     this,
00424                     0,
00425                     &dwThreadID);
00426     if (!m_hThread) {
00427         DWORD dwErr = GetLastError();
00428         return HRESULT_FROM_WIN32(dwErr);
00429     }
00430     return S_OK;
00431 }
00432 
00433 // stop the thread and close the handle
00434 HRESULT
00435 CAsyncIo::CloseThread(void)
00436 {
00437     // signal the thread-exit object
00438     m_evStop.Set();
00439 
00440     if (m_hThread) {
00441 
00442         WaitForSingleObject(m_hThread, INFINITE);
00443         CloseHandle(m_hThread);
00444         m_hThread = NULL;
00445     }
00446     return S_OK;
00447 }
00448 
00449 
00450 // manage the list of requests. hold m_csLists and ensure
00451 // that the (manual reset) event hevList is set when things on
00452 // the list but reset when the list is empty.
00453 // returns null if list empty
00454 CAsyncRequest*
00455 CAsyncIo::GetWorkItem()
00456 {
00457     CAutoLock lck(&m_csLists);
00458 
00459     CAsyncRequest * preq  = m_listWork.RemoveHead();
00460 
00461     // force event set correctly
00462     if (m_listWork.GetCount() == 0) {
00463         m_evWork.Reset();
00464     }
00465     return preq;
00466 }
00467 
00468 // get an item from the done list
00469 CAsyncRequest*
00470 CAsyncIo::GetDoneItem()
00471 {
00472     CAutoLock lock(&m_csLists);
00473 
00474     CAsyncRequest * preq  = m_listDone.RemoveHead();
00475 
00476     // force event set correctly if list now empty
00477     // or we're in the final stages of flushing
00478     // Note that during flushing the way it's supposed to work is that
00479     // everything is shoved on the Done list then the application is
00480     // supposed to pull until it gets nothing more
00481     //
00482     // Thus we should not set m_evDone unconditionally until everything
00483     // has moved to the done list which means we must wait until
00484     // cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
00485 
00486     if (m_listDone.GetCount() == 0 &&
00487         (!m_bFlushing || m_bWaiting)) {
00488         m_evDone.Reset();
00489     }
00490 
00491     return preq;
00492 }
00493 
00494 // put an item on the work list - fail if bFlushing
00495 HRESULT
00496 CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
00497 {
00498     CAutoLock lock(&m_csLists);
00499     HRESULT hr;
00500 
00501     if (m_bFlushing) {
00502         hr = VFW_E_WRONG_STATE;
00503     }
00504     else if (m_listWork.AddTail(pRequest)) {
00505 
00506         // event should now be in a set state - force this
00507         m_evWork.Set();
00508 
00509         // start the thread now if not already started
00510         hr = StartThread();
00511 
00512     } else {
00513         hr = E_OUTOFMEMORY;
00514     }
00515 
00516     return(hr);
00517 }
00518 
00519 // put an item on the done list - ok to do this when
00520 // flushing
00521 HRESULT
00522 CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
00523 {
00524     ASSERT(CritCheckIn(&m_csLists));
00525 
00526     if (m_listDone.AddTail(pRequest)) {
00527 
00528         // event should now be in a set state - force this
00529         m_evDone.Set();
00530         return S_OK;
00531     } else {
00532         return E_OUTOFMEMORY;
00533     }
00534 }
00535 
00536 // called on thread to process any active requests
00537 void
00538 CAsyncIo::ProcessRequests(void)
00539 {
00540     // lock to get the item and increment the outstanding count
00541     CAsyncRequest * preq = NULL;
00542     for (;;) {
00543         {
00544             CAutoLock lock(&m_csLists);
00545 
00546             preq = GetWorkItem();
00547             if (preq == NULL) {
00548                 // done
00549                 return;
00550             }
00551 
00552             // one more item not on the done or work list
00553             m_cItemsOut++;
00554 
00555             // release critsec
00556         }
00557 
00558         preq->Complete();
00559 
00560         // regain critsec to replace on done list
00561         {
00562             CAutoLock l(&m_csLists);
00563 
00564             PutDoneItem(preq);
00565 
00566             if (--m_cItemsOut == 0) {
00567                 if (m_bWaiting) {
00568                     m_evAllDone.Set();
00569                 }
00570             }
00571         }
00572     }
00573 }
00574 
00575 // the thread proc - assumes that DWORD thread param is the
00576 // this pointer
00577 DWORD
00578 CAsyncIo::ThreadProc(void)
00579 {
00580     HANDLE ahev[] = {m_evStop, m_evWork};
00581 
00582     for (;;) {
00583             DWORD dw = WaitForMultipleObjects(
00584                         2,
00585                         ahev,
00586                         FALSE,
00587                         INFINITE);
00588             if (dw == WAIT_OBJECT_0+1) {
00589 
00590                 // requests need processing
00591                 ProcessRequests();
00592             } else {
00593                 // any error or stop event - we should exit
00594                 return 0;
00595             }
00596     }
00597 }
00598 
00599 
00600 
00601 // perform a synchronous read request on this thread.
00602 // may not be aligned - so we will have to buffer.
00603 HRESULT
00604 CAsyncIo::SyncRead(
00605             LONGLONG llPos,
00606             LONG lLength,
00607             BYTE* pBuffer)
00608 {
00609     if (IsAligned(llPos) &&
00610         IsAligned(lLength) &&
00611         IsAligned((LONG) pBuffer)) {
00612         LONG cbUnused;
00613             return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
00614     }
00615 
00616     // not aligned with requirements - use buffered file handle.
00618 
00619     CAsyncRequest request;
00620 
00621     HRESULT hr = request.Request(
00622                     this,
00623                     m_pStream,
00624                     llPos,
00625                     lLength,
00626                     FALSE,
00627                     pBuffer,
00628                     NULL,
00629                     0);
00630 
00631     if (FAILED(hr)) {
00632         return hr;
00633     }
00634 
00635     return request.Complete();
00636 }
00637 
00638 //  Return the alignment
00639 HRESULT
00640 CAsyncIo::Alignment(LONG *pl)
00641 {
00642     *pl = Alignment();
00643     return S_OK;
00644 }

Generated on Tue Dec 13 14:47:23 2005 for guliverkli by  doxygen 1.4.5