00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "stdafx.h"
00011 #include "asyncio.h"
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 HRESULT
00024 CAsyncRequest::Request(
00025 CAsyncIo *pIo,
00026 CAsyncStream *pStream,
00027 LONGLONG llPos,
00028 LONG lLength,
00029 BOOL bAligned,
00030 BYTE* pBuffer,
00031 LPVOID pContext,
00032 DWORD dwUser)
00033 {
00034 m_pIo = pIo;
00035 m_pStream = pStream;
00036 m_llPos = llPos;
00037 m_lLength = lLength;
00038 m_bAligned = bAligned;
00039 m_pBuffer = pBuffer;
00040 m_pContext = pContext;
00041 m_dwUser = dwUser;
00042 m_hr = VFW_E_TIMEOUT;
00043
00044 return S_OK;
00045 }
00046
00047
00048
00049
00050
00051
00052 HRESULT
00053 CAsyncRequest::Complete()
00054 {
00055 m_pStream->Lock();
00056
00057 m_hr = m_pStream->SetPointer(m_llPos);
00058 if (S_OK == m_hr) {
00059
00060 DWORD dwActual;
00061
00062 m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual);
00063 if (m_hr == OLE_S_FIRST) {
00064 if (m_pContext) {
00065 IMediaSample *pSample = reinterpret_cast<IMediaSample *>(m_pContext);
00066 pSample->SetDiscontinuity(TRUE);
00067 m_hr = S_OK;
00068 }
00069 }
00070
00071 if (FAILED(m_hr)) {
00072 } else if (dwActual != (DWORD)m_lLength) {
00073
00074 m_lLength = (LONG) dwActual;
00075 m_hr = S_FALSE;
00076 } else {
00077 m_hr = S_OK;
00078 }
00079 }
00080
00081 m_pStream->Unlock();
00082 return m_hr;
00083 }
00084
00085
00086
00087
00088
00089
00090
00091 CAsyncIo::CAsyncIo(CAsyncStream *pStream)
00092 : m_hThread(NULL),
00093 m_evWork(TRUE),
00094 m_evDone(TRUE),
00095 m_evStop(TRUE),
00096 m_listWork(NAME("Work list")),
00097 m_listDone(NAME("Done list")),
00098 m_bFlushing(FALSE),
00099 m_cItemsOut(0),
00100 m_bWaiting(FALSE),
00101 m_pStream(pStream)
00102 {
00103
00104 }
00105
00106
00107 CAsyncIo::~CAsyncIo()
00108 {
00109
00110 BeginFlush();
00111
00112
00113 CloseThread();
00114
00115
00116 POSITION pos = m_listDone.GetHeadPosition();
00117 while (pos) {
00118 CAsyncRequest* pRequest = m_listDone.GetNext(pos);
00119 delete pRequest;
00120 }
00121 m_listDone.RemoveAll();
00122 }
00123
00124
00125
00126
00127
00128
00129 HRESULT
00130 CAsyncIo::AsyncActive(void)
00131 {
00132 return StartThread();
00133 }
00134
00135
00136
00137
00138
00139 HRESULT
00140 CAsyncIo::AsyncInactive(void)
00141 {
00142 return CloseThread();
00143 }
00144
00145
00146
00147 HRESULT
00148 CAsyncIo::Request(
00149 LONGLONG llPos,
00150 LONG lLength,
00151 BOOL bAligned,
00152 BYTE* pBuffer,
00153 LPVOID pContext,
00154 DWORD dwUser)
00155 {
00156 if (bAligned) {
00157 if (!IsAligned(llPos) ||
00158 !IsAligned(lLength) ||
00159 !IsAligned((LONG) pBuffer)) {
00160 return VFW_E_BADALIGN;
00161 }
00162 }
00163
00164 CAsyncRequest* pRequest = new CAsyncRequest;
00165
00166 HRESULT hr = pRequest->Request(
00167 this,
00168 m_pStream,
00169 llPos,
00170 lLength,
00171 bAligned,
00172 pBuffer,
00173 pContext,
00174 dwUser);
00175 if (SUCCEEDED(hr)) {
00176
00177 hr = PutWorkItem(pRequest);
00178 }
00179
00180 if (FAILED(hr)) {
00181 delete pRequest;
00182 }
00183 return hr;
00184 }
00185
00186
00187
00188 HRESULT
00189 CAsyncIo::WaitForNext(
00190 DWORD dwTimeout,
00191 LPVOID *ppContext,
00192 DWORD * pdwUser,
00193 LONG* pcbActual)
00194 {
00195
00196
00197 *ppContext = NULL;
00198
00199
00200
00201 for (;;) {
00202
00203 if (!m_evDone.Wait(dwTimeout)) {
00204
00205 return VFW_E_TIMEOUT;
00206 }
00207
00208
00209 CAsyncRequest* pRequest = GetDoneItem();
00210 if (pRequest) {
00211
00212
00213
00214 HRESULT hr = pRequest->GetHResult();
00215 if (hr == S_FALSE) {
00216
00217
00218
00219 if ((pRequest->GetActualLength() +
00220 pRequest->GetStart()) == Size()) {
00221 hr = S_OK;
00222 } else {
00223
00224 hr = E_FAIL;
00225 }
00226 }
00227
00228
00229 *pcbActual = pRequest->GetActualLength();
00230
00231
00232 *ppContext = pRequest->GetContext();
00233 *pdwUser = pRequest->GetUser();
00234
00235 delete pRequest;
00236 return hr;
00237 } else {
00238
00239 CAutoLock lck(&m_csLists);
00240 if (m_bFlushing && !m_bWaiting) {
00241
00242
00243
00244
00245
00246
00247 return VFW_E_WRONG_STATE;
00248 }
00249 }
00250
00251
00252
00253 }
00254 }
00255
00256
00257
00258 HRESULT
00259 CAsyncIo::SyncReadAligned(
00260 LONGLONG llPos,
00261 LONG lLength,
00262 BYTE* pBuffer,
00263 LONG* pcbActual,
00264 PVOID pvContext
00265 )
00266 {
00267 if (!IsAligned(llPos) ||
00268 !IsAligned(lLength) ||
00269 !IsAligned((LONG) pBuffer)) {
00270 return VFW_E_BADALIGN;
00271 }
00272
00273 CAsyncRequest request;
00274
00275 HRESULT hr = request.Request(
00276 this,
00277 m_pStream,
00278 llPos,
00279 lLength,
00280 TRUE,
00281 pBuffer,
00282 pvContext,
00283 0);
00284
00285 if (FAILED(hr)) {
00286 return hr;
00287 }
00288
00289 hr = request.Complete();
00290
00291
00292 *pcbActual = request.GetActualLength();
00293 return hr;
00294 }
00295
00296 HRESULT
00297 CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG* pllAvailable)
00298 {
00299 *pllTotal = m_pStream->Size(pllAvailable);
00300 return S_OK;
00301 }
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318 HRESULT
00319 CAsyncIo::BeginFlush()
00320 {
00321
00322 {
00323 CAutoLock lock(&m_csLists);
00324
00325
00326
00327
00328
00329 m_bFlushing = TRUE;
00330
00331 CAsyncRequest * preq;
00332 while((preq = GetWorkItem()) != 0) {
00333 preq->Cancel();
00334 PutDoneItem(preq);
00335 }
00336
00337
00338 if (m_cItemsOut > 0) {
00339
00340
00341 ASSERT(!m_bWaiting);
00342
00343
00344
00345
00346 m_bWaiting = TRUE;
00347 } else {
00348
00349
00350
00351
00352
00353
00354 m_evDone.Set();
00355 return S_OK;
00356 }
00357 }
00358
00359 ASSERT(m_bWaiting);
00360
00361
00362 for (;;) {
00363 m_evAllDone.Wait();
00364 {
00365
00366 CAutoLock lock(&m_csLists);
00367
00368 if (m_cItemsOut == 0) {
00369
00370
00371
00372 m_bWaiting = FALSE;
00373
00374
00375
00376
00377
00378 m_evDone.Set();
00379
00380 return S_OK;
00381 }
00382 }
00383 }
00384 }
00385
00386
00387 HRESULT
00388 CAsyncIo::EndFlush()
00389 {
00390 CAutoLock lock(&m_csLists);
00391
00392 m_bFlushing = FALSE;
00393
00394 ASSERT(!m_bWaiting);
00395
00396
00397
00398 if (m_listDone.GetCount() > 0) {
00399 m_evDone.Set();
00400 } else {
00401 m_evDone.Reset();
00402 }
00403
00404 return S_OK;
00405 }
00406
00407
00408 HRESULT
00409 CAsyncIo::StartThread(void)
00410 {
00411 if (m_hThread) {
00412 return S_OK;
00413 }
00414
00415
00416 m_evStop.Reset();
00417
00418 DWORD dwThreadID;
00419 m_hThread = CreateThread(
00420 NULL,
00421 0,
00422 InitialThreadProc,
00423 this,
00424 0,
00425 &dwThreadID);
00426 if (!m_hThread) {
00427 DWORD dwErr = GetLastError();
00428 return HRESULT_FROM_WIN32(dwErr);
00429 }
00430 return S_OK;
00431 }
00432
00433
00434 HRESULT
00435 CAsyncIo::CloseThread(void)
00436 {
00437
00438 m_evStop.Set();
00439
00440 if (m_hThread) {
00441
00442 WaitForSingleObject(m_hThread, INFINITE);
00443 CloseHandle(m_hThread);
00444 m_hThread = NULL;
00445 }
00446 return S_OK;
00447 }
00448
00449
00450
00451
00452
00453
00454 CAsyncRequest*
00455 CAsyncIo::GetWorkItem()
00456 {
00457 CAutoLock lck(&m_csLists);
00458
00459 CAsyncRequest * preq = m_listWork.RemoveHead();
00460
00461
00462 if (m_listWork.GetCount() == 0) {
00463 m_evWork.Reset();
00464 }
00465 return preq;
00466 }
00467
00468
00469 CAsyncRequest*
00470 CAsyncIo::GetDoneItem()
00471 {
00472 CAutoLock lock(&m_csLists);
00473
00474 CAsyncRequest * preq = m_listDone.RemoveHead();
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486 if (m_listDone.GetCount() == 0 &&
00487 (!m_bFlushing || m_bWaiting)) {
00488 m_evDone.Reset();
00489 }
00490
00491 return preq;
00492 }
00493
00494
00495 HRESULT
00496 CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
00497 {
00498 CAutoLock lock(&m_csLists);
00499 HRESULT hr;
00500
00501 if (m_bFlushing) {
00502 hr = VFW_E_WRONG_STATE;
00503 }
00504 else if (m_listWork.AddTail(pRequest)) {
00505
00506
00507 m_evWork.Set();
00508
00509
00510 hr = StartThread();
00511
00512 } else {
00513 hr = E_OUTOFMEMORY;
00514 }
00515
00516 return(hr);
00517 }
00518
00519
00520
00521 HRESULT
00522 CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
00523 {
00524 ASSERT(CritCheckIn(&m_csLists));
00525
00526 if (m_listDone.AddTail(pRequest)) {
00527
00528
00529 m_evDone.Set();
00530 return S_OK;
00531 } else {
00532 return E_OUTOFMEMORY;
00533 }
00534 }
00535
00536
00537 void
00538 CAsyncIo::ProcessRequests(void)
00539 {
00540
00541 CAsyncRequest * preq = NULL;
00542 for (;;) {
00543 {
00544 CAutoLock lock(&m_csLists);
00545
00546 preq = GetWorkItem();
00547 if (preq == NULL) {
00548
00549 return;
00550 }
00551
00552
00553 m_cItemsOut++;
00554
00555
00556 }
00557
00558 preq->Complete();
00559
00560
00561 {
00562 CAutoLock l(&m_csLists);
00563
00564 PutDoneItem(preq);
00565
00566 if (--m_cItemsOut == 0) {
00567 if (m_bWaiting) {
00568 m_evAllDone.Set();
00569 }
00570 }
00571 }
00572 }
00573 }
00574
00575
00576
00577 DWORD
00578 CAsyncIo::ThreadProc(void)
00579 {
00580 HANDLE ahev[] = {m_evStop, m_evWork};
00581
00582 for (;;) {
00583 DWORD dw = WaitForMultipleObjects(
00584 2,
00585 ahev,
00586 FALSE,
00587 INFINITE);
00588 if (dw == WAIT_OBJECT_0+1) {
00589
00590
00591 ProcessRequests();
00592 } else {
00593
00594 return 0;
00595 }
00596 }
00597 }
00598
00599
00600
00601
00602
00603 HRESULT
00604 CAsyncIo::SyncRead(
00605 LONGLONG llPos,
00606 LONG lLength,
00607 BYTE* pBuffer)
00608 {
00609 if (IsAligned(llPos) &&
00610 IsAligned(lLength) &&
00611 IsAligned((LONG) pBuffer)) {
00612 LONG cbUnused;
00613 return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
00614 }
00615
00616
00618
00619 CAsyncRequest request;
00620
00621 HRESULT hr = request.Request(
00622 this,
00623 m_pStream,
00624 llPos,
00625 lLength,
00626 FALSE,
00627 pBuffer,
00628 NULL,
00629 0);
00630
00631 if (FAILED(hr)) {
00632 return hr;
00633 }
00634
00635 return request.Complete();
00636 }
00637
00638
00639 HRESULT
00640 CAsyncIo::Alignment(LONG *pl)
00641 {
00642 *pl = Alignment();
00643 return S_OK;
00644 }