|
MythTV
0.26-pre
|
00001 #include <algorithm> 00002 using namespace std; 00003 00004 #include "DeviceReadBuffer.h" 00005 #include "mythcorecontext.h" 00006 #include "mythbaseutil.h" 00007 #include "mythlogging.h" 00008 #include "tspacket.h" 00009 #include "mthread.h" 00010 #include "compat.h" 00011 00012 #ifndef USING_MINGW 00013 #include <sys/poll.h> 00014 #endif 00015 00017 #define REPORT_RING_STATS 0 00018 00019 #define LOC QString("DevRdB(%1): ").arg(videodevice) 00020 00021 DeviceReadBuffer::DeviceReadBuffer( 00022 DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout) 00023 : MThread("DeviceReadBuffer"), 00024 videodevice(""), _stream_fd(-1), 00025 readerCB(cb), 00026 00027 // Data for managing the device ringbuffer 00028 dorun(false), 00029 eof(false), error(false), 00030 request_pause(false), paused(false), 00031 using_poll(use_poll), 00032 poll_timeout_is_error(error_exit_on_poll_timeout), 00033 max_poll_wait(2500 /*ms*/), 00034 00035 size(0), used(0), 00036 read_quanta(0), 00037 dev_read_size(0), min_read(0), 00038 00039 buffer(NULL), readPtr(NULL), 00040 writePtr(NULL), endPtr(NULL), 00041 00042 // statistics 00043 max_used(0), avg_used(0), 00044 avg_cnt(0) 00045 { 00046 for (int i = 0; i < 2; i++) 00047 { 00048 wake_pipe[i] = -1; 00049 wake_pipe_flags[i] = 0; 00050 } 00051 00052 #if defined( USING_MINGW ) && !defined( _MSC_VER ) 00053 #warning mingw DeviceReadBuffer::Poll 00054 if (using_poll) 00055 { 00056 LOG(VB_GENERAL, LOG_WARNING, LOC + 00057 "mingw DeviceReadBuffer::Poll is not implemented"); 00058 using_poll = false; 00059 } 00060 #endif 00061 } 00062 00063 DeviceReadBuffer::~DeviceReadBuffer() 00064 { 00065 Stop(); 00066 if (buffer) 00067 { 00068 delete[] buffer; 00069 buffer = NULL; 00070 } 00071 } 00072 00073 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd, 00074 uint readQuanta, uint deviceBufferSize) 00075 { 00076 QMutexLocker locker(&lock); 00077 00078 if (buffer) 00079 delete[] buffer; 00080 00081 videodevice = streamName; 00082 videodevice = (videodevice == QString::null) ? "" : videodevice; 00083 _stream_fd = streamfd; 00084 00085 // Setup device ringbuffer 00086 eof = false; 00087 error = false; 00088 request_pause = false; 00089 paused = false; 00090 00091 read_quanta = (readQuanta) ? readQuanta : read_quanta; 00092 size = gCoreContext->GetNumSetting( 00093 "HDRingbufferSize", 50 * read_quanta) * 1024; 00094 used = 0; 00095 dev_read_size = read_quanta * (using_poll ? 256 : 48); 00096 dev_read_size = (deviceBufferSize) ? 00097 min(dev_read_size, (size_t)deviceBufferSize) : dev_read_size; 00098 min_read = read_quanta * 4; 00099 00100 buffer = new unsigned char[size + dev_read_size]; 00101 readPtr = buffer; 00102 writePtr = buffer; 00103 endPtr = buffer + size; 00104 00105 // Initialize buffer, if it exists 00106 if (!buffer) 00107 { 00108 LOG(VB_GENERAL, LOG_ERR, LOC + 00109 QString("Failed to allocate buffer of size %1 = %2 + %3") 00110 .arg(size+dev_read_size).arg(size).arg(dev_read_size)); 00111 return false; 00112 } 00113 memset(buffer, 0xFF, size + read_quanta); 00114 00115 // Initialize statistics 00116 max_used = 0; 00117 avg_used = 0; 00118 avg_cnt = 0; 00119 lastReport.start(); 00120 00121 LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(size/1024)); 00122 00123 return true; 00124 } 00125 00126 void DeviceReadBuffer::Start(void) 00127 { 00128 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin"); 00129 00130 QMutexLocker locker(&lock); 00131 if (isRunning() || dorun) 00132 { 00133 dorun = false; 00134 locker.unlock(); 00135 WakePoll(); 00136 wait(); 00137 locker.relock(); 00138 } 00139 00140 dorun = true; 00141 error = false; 00142 eof = false; 00143 00144 start(); 00145 00146 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle"); 00147 00148 while (dorun && !isRunning()) 00149 runWait.wait(locker.mutex(), 100); 00150 00151 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end"); 00152 } 00153 00154 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd) 00155 { 00156 QMutexLocker locker(&lock); 00157 00158 videodevice = streamName; 00159 videodevice = (videodevice == QString::null) ? "" : videodevice; 00160 _stream_fd = streamfd; 00161 00162 used = 0; 00163 readPtr = buffer; 00164 writePtr = buffer; 00165 00166 error = false; 00167 } 00168 00169 void DeviceReadBuffer::Stop(void) 00170 { 00171 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin"); 00172 QMutexLocker locker(&lock); 00173 if (isRunning() || dorun) 00174 { 00175 dorun = false; 00176 locker.unlock(); 00177 WakePoll(); 00178 wait(); 00179 } 00180 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end"); 00181 } 00182 00183 void DeviceReadBuffer::SetRequestPause(bool req) 00184 { 00185 QMutexLocker locker(&lock); 00186 request_pause = req; 00187 WakePoll(); 00188 } 00189 00190 void DeviceReadBuffer::SetPaused(bool val) 00191 { 00192 QMutexLocker locker(&lock); 00193 paused = val; 00194 if (val) 00195 pauseWait.wakeAll(); 00196 else 00197 unpauseWait.wakeAll(); 00198 } 00199 00200 // The WakePoll code is copied from MythSocketThread::WakeReadyReadThread() 00201 void DeviceReadBuffer::WakePoll(void) const 00202 { 00203 char buf[1]; 00204 buf[0] = '0'; 00205 ssize_t wret = 0; 00206 while (isRunning() && (wret <= 0) && (wake_pipe[1] >= 0)) 00207 { 00208 wret = ::write(wake_pipe[1], &buf, 1); 00209 if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno)) 00210 { 00211 LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed."); 00212 ClosePipes(); 00213 break; 00214 } 00215 } 00216 } 00217 00218 void DeviceReadBuffer::ClosePipes(void) const 00219 { 00220 for (uint i = 0; i < 2; i++) 00221 { 00222 if (wake_pipe[i] >= 0) 00223 { 00224 ::close(wake_pipe[i]); 00225 wake_pipe[i] = -1; 00226 wake_pipe_flags[i] = 0; 00227 } 00228 } 00229 } 00230 00231 bool DeviceReadBuffer::IsPaused(void) const 00232 { 00233 QMutexLocker locker(&lock); 00234 return paused; 00235 } 00236 00237 bool DeviceReadBuffer::WaitForPaused(unsigned long timeout) 00238 { 00239 QMutexLocker locker(&lock); 00240 00241 if (!paused) 00242 pauseWait.wait(&lock, timeout); 00243 00244 return paused; 00245 } 00246 00247 bool DeviceReadBuffer::WaitForUnpause(unsigned long timeout) 00248 { 00249 QMutexLocker locker(&lock); 00250 00251 if (paused) 00252 unpauseWait.wait(&lock, timeout); 00253 00254 return paused; 00255 } 00256 00257 bool DeviceReadBuffer::IsPauseRequested(void) const 00258 { 00259 QMutexLocker locker(&lock); 00260 return request_pause; 00261 } 00262 00263 bool DeviceReadBuffer::IsErrored(void) const 00264 { 00265 QMutexLocker locker(&lock); 00266 return error; 00267 } 00268 00269 bool DeviceReadBuffer::IsEOF(void) const 00270 { 00271 QMutexLocker locker(&lock); 00272 return eof; 00273 } 00274 00275 bool DeviceReadBuffer::IsRunning(void) const 00276 { 00277 QMutexLocker locker(&lock); 00278 return isRunning(); 00279 } 00280 00281 uint DeviceReadBuffer::GetUnused(void) const 00282 { 00283 QMutexLocker locker(&lock); 00284 return size - used; 00285 } 00286 00287 uint DeviceReadBuffer::GetUsed(void) const 00288 { 00289 QMutexLocker locker(&lock); 00290 return used; 00291 } 00292 00293 uint DeviceReadBuffer::GetContiguousUnused(void) const 00294 { 00295 QMutexLocker locker(&lock); 00296 return endPtr - writePtr; 00297 } 00298 00299 void DeviceReadBuffer::IncrWritePointer(uint len) 00300 { 00301 QMutexLocker locker(&lock); 00302 used += len; 00303 writePtr += len; 00304 writePtr = (writePtr >= endPtr) ? buffer + (writePtr - endPtr) : writePtr; 00305 #if REPORT_RING_STATS 00306 max_used = max(used, max_used); 00307 avg_used = ((avg_used * avg_cnt) + used) / ++avg_cnt; 00308 #endif 00309 dataWait.wakeAll(); 00310 } 00311 00312 void DeviceReadBuffer::IncrReadPointer(uint len) 00313 { 00314 QMutexLocker locker(&lock); 00315 used -= len; 00316 readPtr += len; 00317 readPtr = (readPtr == endPtr) ? buffer : readPtr; 00318 } 00319 00320 void DeviceReadBuffer::run(void) 00321 { 00322 RunProlog(); 00323 00324 uint errcnt = 0; 00325 00326 lock.lock(); 00327 runWait.wakeAll(); 00328 lock.unlock(); 00329 00330 if (using_poll) 00331 setup_pipe(wake_pipe, wake_pipe_flags); 00332 00333 while (dorun) 00334 { 00335 if (!HandlePausing()) 00336 continue; 00337 00338 if (!IsOpen()) 00339 { 00340 usleep(5000); 00341 continue; 00342 } 00343 00344 if (using_poll && !Poll()) 00345 continue; 00346 00347 { 00348 QMutexLocker locker(&lock); 00349 if (error) 00350 { 00351 LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state"); 00352 break; 00353 } 00354 } 00355 00356 // Limit read size for faster return from read 00357 size_t unused = (size_t) WaitForUnused(read_quanta); 00358 size_t read_size = min(dev_read_size, unused); 00359 00360 // if read_size > 0 do the read... 00361 if (read_size) 00362 { 00363 ssize_t len = read(_stream_fd, writePtr, read_size); 00364 if (!CheckForErrors(len, read_size, errcnt)) 00365 { 00366 if (errcnt > 5) 00367 break; 00368 else 00369 continue; 00370 } 00371 errcnt = 0; 00372 // if we wrote past the official end of the buffer, copy to start 00373 if (writePtr + len > endPtr) 00374 memcpy(buffer, endPtr, writePtr + len - endPtr); 00375 IncrWritePointer(len); 00376 } 00377 } 00378 00379 ClosePipes(); 00380 00381 lock.lock(); 00382 eof = true; 00383 runWait.wakeAll(); 00384 dataWait.wakeAll(); 00385 pauseWait.wakeAll(); 00386 unpauseWait.wakeAll(); 00387 lock.unlock(); 00388 00389 RunEpilog(); 00390 } 00391 00392 bool DeviceReadBuffer::HandlePausing(void) 00393 { 00394 if (IsPauseRequested()) 00395 { 00396 SetPaused(true); 00397 00398 if (readerCB) 00399 readerCB->ReaderPaused(_stream_fd); 00400 00401 usleep(5000); 00402 return false; 00403 } 00404 else if (IsPaused()) 00405 { 00406 Reset(videodevice, _stream_fd); 00407 SetPaused(false); 00408 } 00409 return true; 00410 } 00411 00412 bool DeviceReadBuffer::Poll(void) const 00413 { 00414 #ifdef USING_MINGW 00415 # ifdef _MSC_VER 00416 # pragma message( "mingw DeviceReadBuffer::Poll" ) 00417 # else 00418 # warning mingw DeviceReadBuffer::Poll 00419 # endif 00420 LOG(VB_GENERAL, LOG_ERR, LOC + 00421 "mingw DeviceReadBuffer::Poll is not implemented"); 00422 return false; 00423 #else 00424 bool retval = true; 00425 MythTimer timer; 00426 timer.start(); 00427 00428 int poll_cnt = 1; 00429 struct pollfd polls[2]; 00430 memset(polls, 0, sizeof(polls)); 00431 00432 polls[0].fd = _stream_fd; 00433 polls[0].events = POLLIN | POLLPRI; 00434 polls[0].revents = 0; 00435 00436 if (wake_pipe[0] >= 0) 00437 { 00438 poll_cnt = 2; 00439 polls[1].fd = wake_pipe[0]; 00440 polls[1].events = POLLIN; 00441 polls[1].revents = 0; 00442 } 00443 00444 while (true) 00445 { 00446 polls[0].revents = 0; 00447 polls[1].revents = 0; 00448 poll_cnt = (wake_pipe[0] >= 0) ? poll_cnt : 1; 00449 00450 int timeout = max_poll_wait; 00451 if (1 == poll_cnt) 00452 timeout = 10; 00453 else if (poll_timeout_is_error) 00454 timeout = max((int)max_poll_wait - timer.elapsed(), 10); 00455 00456 int ret = poll(polls, poll_cnt, timeout); 00457 00458 if (polls[0].revents & (POLLHUP | POLLNVAL)) 00459 { 00460 LOG(VB_GENERAL, LOG_ERR, LOC + "poll error"); 00461 error = true; 00462 return true; 00463 } 00464 00465 if (!dorun || !IsOpen() || IsPauseRequested()) 00466 { 00467 retval = false; 00468 break; // are we supposed to pause, stop, etc. 00469 } 00470 00471 if (polls[0].revents & POLLPRI) 00472 { 00473 readerCB->PriorityEvent(polls[0].fd); 00474 } 00475 00476 if (polls[0].revents & POLLIN) 00477 { 00478 if (ret > 0) 00479 break; // we have data to read :) 00480 else if (ret < 0) 00481 { 00482 if ((EOVERFLOW == errno)) 00483 break; // we have an error to handle 00484 00485 if ((EAGAIN == errno) || (EINTR == errno)) 00486 continue; // errors that tell you to try again 00487 00488 usleep(2500 /*2.5 ms*/); 00489 } 00490 else // ret == 0 00491 { 00492 if (poll_timeout_is_error && 00493 (timer.elapsed() >= (int)max_poll_wait)) 00494 { 00495 LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1"); 00496 QMutexLocker locker(&lock); 00497 error = true; 00498 return true; 00499 } 00500 } 00501 } 00502 00503 // Clear out any pending pipe reads 00504 if ((poll_cnt > 1) && (polls[1].revents & POLLIN)) 00505 { 00506 char dummy[128]; 00507 int cnt = (wake_pipe_flags[0] & O_NONBLOCK) ? 128 : 1; 00508 cnt = ::read(wake_pipe[0], dummy, cnt); 00509 } 00510 00511 if (poll_timeout_is_error && (timer.elapsed() >= (int)max_poll_wait)) 00512 { 00513 LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 2"); 00514 QMutexLocker locker(&lock); 00515 error = true; 00516 return true; 00517 } 00518 } 00519 00520 int e = timer.elapsed(); 00521 if (e > (int)max_poll_wait) 00522 { 00523 LOG(VB_GENERAL, LOG_WARNING, LOC + 00524 QString("Poll took an unusually long time %1 ms") 00525 .arg(timer.elapsed())); 00526 } 00527 00528 return retval; 00529 #endif //!USING_MINGW 00530 } 00531 00532 bool DeviceReadBuffer::CheckForErrors( 00533 ssize_t len, size_t requested_len, uint &errcnt) 00534 { 00535 if (len > (ssize_t)requested_len) 00536 { 00537 LOG(VB_GENERAL, LOG_ERR, LOC + 00538 "Driver is returning bogus values on read"); 00539 if (++errcnt > 5) 00540 { 00541 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors."); 00542 QMutexLocker locker(&lock); 00543 error = true; 00544 } 00545 return false; 00546 } 00547 00548 #ifdef USING_MINGW 00549 # ifdef _MSC_VER 00550 # pragma message( "mingw DeviceReadBuffer::CheckForErrors" ) 00551 # else 00552 # warning mingw DeviceReadBuffer::CheckForErrors 00553 # endif 00554 LOG(VB_GENERAL, LOG_ERR, LOC + 00555 "mingw DeviceReadBuffer::CheckForErrors is not implemented"); 00556 return false; 00557 #else 00558 if (len < 0) 00559 { 00560 if (EINTR == errno) 00561 return false; 00562 if (EAGAIN == errno) 00563 { 00564 usleep(2500); 00565 return false; 00566 } 00567 if (EOVERFLOW == errno) 00568 { 00569 LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed"); 00570 return false; 00571 } 00572 00573 LOG(VB_GENERAL, LOG_ERR, LOC + 00574 QString("Problem reading fd(%1)").arg(_stream_fd) + ENO); 00575 00576 if (++errcnt > 5) 00577 { 00578 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors."); 00579 QMutexLocker locker(&lock); 00580 error = true; 00581 return false; 00582 } 00583 00584 usleep(500); 00585 return false; 00586 } 00587 else if (len == 0) 00588 { 00589 if (++errcnt > 5) 00590 { 00591 LOG(VB_GENERAL, LOG_ERR, LOC + 00592 QString("End-Of-File? fd(%1)").arg(_stream_fd)); 00593 00594 lock.lock(); 00595 eof = true; 00596 lock.unlock(); 00597 00598 return false; 00599 } 00600 usleep(500); 00601 return false; 00602 } 00603 return true; 00604 #endif 00605 } 00606 00613 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count) 00614 { 00615 uint avail = WaitForUsed(min(count, (uint)min_read), 500); 00616 size_t cnt = min(count, avail); 00617 00618 if (!cnt) 00619 return 0; 00620 00621 if (readPtr + cnt > endPtr) 00622 { 00623 // Process as two pieces 00624 size_t len = endPtr - readPtr; 00625 if (len) 00626 { 00627 memcpy(buf, readPtr, len); 00628 buf += len; 00629 IncrReadPointer(len); 00630 } 00631 if (cnt > len) 00632 { 00633 len = cnt - len; 00634 memcpy(buf, readPtr, len); 00635 IncrReadPointer(len); 00636 } 00637 } 00638 else 00639 { 00640 memcpy(buf, readPtr, cnt); 00641 IncrReadPointer(cnt); 00642 } 00643 00644 #if REPORT_RING_STATS 00645 ReportStats(); 00646 #endif 00647 00648 return cnt; 00649 } 00650 00655 uint DeviceReadBuffer::WaitForUnused(uint needed) const 00656 { 00657 size_t unused = GetUnused(); 00658 00659 if (unused > read_quanta) 00660 { 00661 while (unused < needed) 00662 { 00663 unused = GetUnused(); 00664 if (IsPauseRequested() || !IsOpen() || !dorun) 00665 return 0; 00666 usleep(5000); 00667 } 00668 if (IsPauseRequested() || !IsOpen() || !dorun) 00669 return 0; 00670 unused = GetUnused(); 00671 } 00672 00673 return unused; 00674 } 00675 00681 uint DeviceReadBuffer::WaitForUsed(uint needed, uint max_wait) const 00682 { 00683 MythTimer timer; 00684 timer.start(); 00685 00686 QMutexLocker locker(&lock); 00687 size_t avail = used; 00688 while ((needed > avail) && isRunning() && 00689 !request_pause && !error && !eof && 00690 (timer.elapsed() < (int)max_wait)) 00691 { 00692 dataWait.wait(locker.mutex(), 10); 00693 avail = used; 00694 } 00695 return avail; 00696 } 00697 00698 void DeviceReadBuffer::ReportStats(void) 00699 { 00700 #if REPORT_RING_STATS 00701 if (lastReport.elapsed() > 20*1000 /* msg every 20 seconds */) 00702 { 00703 QMutexLocker locker(&lock); 00704 double rsize = 100.0 / size; 00705 QString msg = QString("fill avg(%1%) ").arg(avg_used*rsize,3,'f',0); 00706 msg += QString("fill max(%2%) ").arg(max_used*rsize,3,'f',0); 00707 msg += QString("samples(%3)").arg(avg_cnt); 00708 00709 avg_used = 0; 00710 avg_cnt = 0; 00711 max_used = 0; 00712 lastReport.start(); 00713 00714 LOG(VB_GENERAL, LOG_DEBUG, LOC + msg); 00715 } 00716 #endif 00717 } 00718 00719 /* 00720 * vim:ts=4:sw=4:ai:et:si:sts=4 00721 */
1.7.6.1