MythTV  0.26-pre
DeviceReadBuffer.cpp
Go to the documentation of this file.
00001 #include <algorithm>
00002 using namespace std;
00003 
00004 #include "DeviceReadBuffer.h"
00005 #include "mythcorecontext.h"
00006 #include "mythbaseutil.h"
00007 #include "mythlogging.h"
00008 #include "tspacket.h"
00009 #include "mthread.h"
00010 #include "compat.h"
00011 
00012 #ifndef USING_MINGW
00013 #include <sys/poll.h>
00014 #endif
00015 
00017 #define REPORT_RING_STATS 0
00018 
00019 #define LOC QString("DevRdB(%1): ").arg(videodevice)
00020 
00021 DeviceReadBuffer::DeviceReadBuffer(
00022     DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
00023     : MThread("DeviceReadBuffer"),
00024       videodevice(""),              _stream_fd(-1),
00025       readerCB(cb),
00026 
00027       // Data for managing the device ringbuffer
00028       dorun(false),
00029       eof(false),                   error(false),
00030       request_pause(false),         paused(false),
00031       using_poll(use_poll),
00032       poll_timeout_is_error(error_exit_on_poll_timeout),
00033       max_poll_wait(2500 /*ms*/),
00034 
00035       size(0),                      used(0),
00036       read_quanta(0),
00037       dev_read_size(0),             min_read(0),
00038 
00039       buffer(NULL),                 readPtr(NULL),
00040       writePtr(NULL),               endPtr(NULL),
00041 
00042       // statistics
00043       max_used(0),                  avg_used(0),
00044       avg_cnt(0)
00045 {
00046     for (int i = 0; i < 2; i++)
00047     {
00048         wake_pipe[i] = -1;
00049         wake_pipe_flags[i] = 0;
00050     }
00051 
00052 #if defined( USING_MINGW ) && !defined( _MSC_VER )
00053 #warning mingw DeviceReadBuffer::Poll
00054     if (using_poll)
00055     {
00056         LOG(VB_GENERAL, LOG_WARNING, LOC +
00057             "mingw DeviceReadBuffer::Poll is not implemented");
00058         using_poll = false;
00059     }
00060 #endif
00061 }
00062 
00063 DeviceReadBuffer::~DeviceReadBuffer()
00064 {
00065     Stop();
00066     if (buffer)
00067     {
00068         delete[] buffer;
00069         buffer = NULL;
00070     }
00071 }
00072 
00073 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
00074                              uint readQuanta, uint deviceBufferSize)
00075 {
00076     QMutexLocker locker(&lock);
00077 
00078     if (buffer)
00079         delete[] buffer;
00080 
00081     videodevice   = streamName;
00082     videodevice   = (videodevice == QString::null) ? "" : videodevice;
00083     _stream_fd    = streamfd;
00084 
00085     // Setup device ringbuffer
00086     eof           = false;
00087     error         = false;
00088     request_pause = false;
00089     paused        = false;
00090 
00091     read_quanta   = (readQuanta) ? readQuanta : read_quanta;
00092     size          = gCoreContext->GetNumSetting(
00093         "HDRingbufferSize", 50 * read_quanta) * 1024;
00094     used          = 0;
00095     dev_read_size = read_quanta * (using_poll ? 256 : 48);
00096     dev_read_size = (deviceBufferSize) ?
00097         min(dev_read_size, (size_t)deviceBufferSize) : dev_read_size;
00098     min_read      = read_quanta * 4;
00099 
00100     buffer        = new unsigned char[size + dev_read_size];
00101     readPtr       = buffer;
00102     writePtr      = buffer;
00103     endPtr        = buffer + size;
00104 
00105     // Initialize buffer, if it exists
00106     if (!buffer)
00107     {
00108         LOG(VB_GENERAL, LOG_ERR, LOC +
00109             QString("Failed to allocate buffer of size %1 = %2 + %3")
00110                 .arg(size+dev_read_size).arg(size).arg(dev_read_size));
00111         return false;
00112     }
00113     memset(buffer, 0xFF, size + read_quanta);
00114 
00115     // Initialize statistics
00116     max_used      = 0;
00117     avg_used      = 0;
00118     avg_cnt       = 0;
00119     lastReport.start();
00120 
00121     LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(size/1024));
00122 
00123     return true;
00124 }
00125 
00126 void DeviceReadBuffer::Start(void)
00127 {
00128     LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
00129 
00130     QMutexLocker locker(&lock);
00131     if (isRunning() || dorun)
00132     {
00133         dorun = false;
00134         locker.unlock();
00135         WakePoll();
00136         wait();
00137         locker.relock();
00138     }
00139 
00140     dorun = true;
00141     error = false;
00142     eof   = false;
00143 
00144     start();
00145 
00146     LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
00147 
00148     while (dorun && !isRunning())
00149         runWait.wait(locker.mutex(), 100);
00150 
00151     LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
00152 }
00153 
00154 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
00155 {
00156     QMutexLocker locker(&lock);
00157 
00158     videodevice   = streamName;
00159     videodevice   = (videodevice == QString::null) ? "" : videodevice;
00160     _stream_fd    = streamfd;
00161 
00162     used          = 0;
00163     readPtr       = buffer;
00164     writePtr      = buffer;
00165 
00166     error         = false;
00167 }
00168 
00169 void DeviceReadBuffer::Stop(void)
00170 {
00171     LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
00172     QMutexLocker locker(&lock);
00173     if (isRunning() || dorun)
00174     {
00175         dorun = false;
00176         locker.unlock();
00177         WakePoll();
00178         wait();
00179     }
00180     LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
00181 }
00182 
00183 void DeviceReadBuffer::SetRequestPause(bool req)
00184 {
00185     QMutexLocker locker(&lock);
00186     request_pause = req;
00187     WakePoll();
00188 }
00189 
00190 void DeviceReadBuffer::SetPaused(bool val)
00191 {
00192     QMutexLocker locker(&lock);
00193     paused = val;
00194     if (val)
00195         pauseWait.wakeAll();
00196     else
00197         unpauseWait.wakeAll();
00198 }
00199 
00200 // The WakePoll code is copied from MythSocketThread::WakeReadyReadThread()
00201 void DeviceReadBuffer::WakePoll(void) const
00202 {
00203     char buf[1];
00204     buf[0] = '0';
00205     ssize_t wret = 0;
00206     while (isRunning() && (wret <= 0) && (wake_pipe[1] >= 0))
00207     {
00208         wret = ::write(wake_pipe[1], &buf, 1);
00209         if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
00210         {
00211             LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
00212             ClosePipes();
00213             break;
00214         }
00215     }
00216 }
00217 
00218 void DeviceReadBuffer::ClosePipes(void) const
00219 {
00220     for (uint i = 0; i < 2; i++)
00221     {
00222         if (wake_pipe[i] >= 0)
00223         {
00224             ::close(wake_pipe[i]);
00225             wake_pipe[i] = -1;
00226             wake_pipe_flags[i] = 0;
00227         }
00228     }
00229 }
00230 
00231 bool DeviceReadBuffer::IsPaused(void) const
00232 {
00233     QMutexLocker locker(&lock);
00234     return paused;
00235 }
00236 
00237 bool DeviceReadBuffer::WaitForPaused(unsigned long timeout)
00238 {
00239     QMutexLocker locker(&lock);
00240 
00241     if (!paused)
00242         pauseWait.wait(&lock, timeout);
00243 
00244     return paused;
00245 }
00246 
00247 bool DeviceReadBuffer::WaitForUnpause(unsigned long timeout)
00248 {
00249     QMutexLocker locker(&lock);
00250 
00251     if (paused)
00252         unpauseWait.wait(&lock, timeout);
00253 
00254     return paused;
00255 }
00256 
00257 bool DeviceReadBuffer::IsPauseRequested(void) const
00258 {
00259     QMutexLocker locker(&lock);
00260     return request_pause;
00261 }
00262 
00263 bool DeviceReadBuffer::IsErrored(void) const
00264 {
00265     QMutexLocker locker(&lock);
00266     return error;
00267 }
00268 
00269 bool DeviceReadBuffer::IsEOF(void) const
00270 {
00271     QMutexLocker locker(&lock);
00272     return eof;
00273 }
00274 
00275 bool DeviceReadBuffer::IsRunning(void) const
00276 {
00277     QMutexLocker locker(&lock);
00278     return isRunning();
00279 }
00280 
00281 uint DeviceReadBuffer::GetUnused(void) const
00282 {
00283     QMutexLocker locker(&lock);
00284     return size - used;
00285 }
00286 
00287 uint DeviceReadBuffer::GetUsed(void) const
00288 {
00289     QMutexLocker locker(&lock);
00290     return used;
00291 }
00292 
00293 uint DeviceReadBuffer::GetContiguousUnused(void) const
00294 {
00295     QMutexLocker locker(&lock);
00296     return endPtr - writePtr;
00297 }
00298 
00299 void DeviceReadBuffer::IncrWritePointer(uint len)
00300 {
00301     QMutexLocker locker(&lock);
00302     used     += len;
00303     writePtr += len;
00304     writePtr  = (writePtr >= endPtr) ? buffer + (writePtr - endPtr) : writePtr;
00305 #if REPORT_RING_STATS
00306     max_used = max(used, max_used);
00307     avg_used = ((avg_used * avg_cnt) + used) / ++avg_cnt;
00308 #endif
00309     dataWait.wakeAll();
00310 }
00311 
00312 void DeviceReadBuffer::IncrReadPointer(uint len)
00313 {
00314     QMutexLocker locker(&lock);
00315     used    -= len;
00316     readPtr += len;
00317     readPtr  = (readPtr == endPtr) ? buffer : readPtr;
00318 }
00319 
00320 void DeviceReadBuffer::run(void)
00321 {
00322     RunProlog();
00323 
00324     uint      errcnt = 0;
00325 
00326     lock.lock();
00327     runWait.wakeAll();
00328     lock.unlock();
00329 
00330     if (using_poll)
00331         setup_pipe(wake_pipe, wake_pipe_flags);
00332 
00333     while (dorun)
00334     {
00335         if (!HandlePausing())
00336             continue;
00337 
00338         if (!IsOpen())
00339         {
00340             usleep(5000);
00341             continue;
00342         }
00343 
00344         if (using_poll && !Poll())
00345             continue;
00346 
00347         {
00348             QMutexLocker locker(&lock);
00349             if (error)
00350             {
00351                 LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
00352                 break;
00353             }
00354         }
00355 
00356         // Limit read size for faster return from read
00357         size_t unused = (size_t) WaitForUnused(read_quanta);
00358         size_t read_size = min(dev_read_size, unused);
00359 
00360         // if read_size > 0 do the read...
00361         if (read_size)
00362         {
00363             ssize_t len = read(_stream_fd, writePtr, read_size);
00364             if (!CheckForErrors(len, read_size, errcnt))
00365             {
00366                 if (errcnt > 5)
00367                     break;
00368                 else
00369                     continue;
00370             }
00371             errcnt = 0;
00372             // if we wrote past the official end of the buffer, copy to start
00373             if (writePtr + len > endPtr)
00374                 memcpy(buffer, endPtr, writePtr + len - endPtr);
00375             IncrWritePointer(len);
00376         }
00377     }
00378 
00379     ClosePipes();
00380 
00381     lock.lock();
00382     eof     = true;
00383     runWait.wakeAll();
00384     dataWait.wakeAll();
00385     pauseWait.wakeAll();
00386     unpauseWait.wakeAll();
00387     lock.unlock();
00388 
00389     RunEpilog();
00390 }
00391 
00392 bool DeviceReadBuffer::HandlePausing(void)
00393 {
00394     if (IsPauseRequested())
00395     {
00396         SetPaused(true);
00397 
00398         if (readerCB)
00399             readerCB->ReaderPaused(_stream_fd);
00400 
00401         usleep(5000);
00402         return false;
00403     }
00404     else if (IsPaused())
00405     {
00406         Reset(videodevice, _stream_fd);
00407         SetPaused(false);
00408     }
00409     return true;
00410 }
00411 
00412 bool DeviceReadBuffer::Poll(void) const
00413 {
00414 #ifdef USING_MINGW
00415 # ifdef _MSC_VER
00416 #  pragma message( "mingw DeviceReadBuffer::Poll" )
00417 # else
00418 #  warning mingw DeviceReadBuffer::Poll
00419 # endif
00420     LOG(VB_GENERAL, LOG_ERR, LOC +
00421         "mingw DeviceReadBuffer::Poll is not implemented");
00422     return false;
00423 #else
00424     bool retval = true;
00425     MythTimer timer;
00426     timer.start();
00427 
00428     int poll_cnt = 1;
00429     struct pollfd polls[2];
00430     memset(polls, 0, sizeof(polls));
00431 
00432     polls[0].fd      = _stream_fd;
00433     polls[0].events  = POLLIN | POLLPRI;
00434     polls[0].revents = 0;
00435 
00436     if (wake_pipe[0] >= 0)
00437     {
00438         poll_cnt = 2;
00439         polls[1].fd      = wake_pipe[0];
00440         polls[1].events  = POLLIN;
00441         polls[1].revents = 0;
00442     }
00443 
00444     while (true)
00445     {
00446         polls[0].revents = 0;
00447         polls[1].revents = 0;
00448         poll_cnt = (wake_pipe[0] >= 0) ? poll_cnt : 1;
00449 
00450         int timeout = max_poll_wait;
00451         if (1 == poll_cnt)
00452             timeout = 10;
00453         else if (poll_timeout_is_error)
00454             timeout = max((int)max_poll_wait - timer.elapsed(), 10);
00455 
00456         int ret = poll(polls, poll_cnt, timeout);
00457 
00458         if (polls[0].revents & (POLLHUP | POLLNVAL))
00459         {
00460             LOG(VB_GENERAL, LOG_ERR, LOC + "poll error");
00461             error = true;
00462             return true;
00463         }
00464 
00465         if (!dorun || !IsOpen() || IsPauseRequested())
00466         {
00467             retval = false;
00468             break; // are we supposed to pause, stop, etc.
00469         }
00470 
00471         if (polls[0].revents & POLLPRI)
00472         {
00473             readerCB->PriorityEvent(polls[0].fd);
00474         }
00475 
00476         if (polls[0].revents & POLLIN)
00477         {
00478             if (ret > 0)
00479                 break; // we have data to read :)
00480             else if (ret < 0)
00481             {
00482                 if ((EOVERFLOW == errno))
00483                     break; // we have an error to handle
00484 
00485                 if ((EAGAIN == errno) || (EINTR  == errno))
00486                     continue; // errors that tell you to try again
00487 
00488                 usleep(2500 /*2.5 ms*/);
00489             }
00490             else //  ret == 0
00491             {
00492                 if (poll_timeout_is_error &&
00493                     (timer.elapsed() >= (int)max_poll_wait))
00494                 {
00495                     LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
00496                     QMutexLocker locker(&lock);
00497                     error = true;
00498                     return true;
00499                 }
00500             }
00501         }
00502 
00503         // Clear out any pending pipe reads
00504         if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
00505         {
00506             char dummy[128];
00507             int cnt = (wake_pipe_flags[0] & O_NONBLOCK) ? 128 : 1;
00508             cnt = ::read(wake_pipe[0], dummy, cnt);
00509         }
00510 
00511         if (poll_timeout_is_error && (timer.elapsed() >= (int)max_poll_wait))
00512         {
00513             LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 2");
00514             QMutexLocker locker(&lock);
00515             error = true;
00516             return true;
00517         }
00518     }
00519 
00520     int e = timer.elapsed();
00521     if (e > (int)max_poll_wait)
00522     {
00523         LOG(VB_GENERAL, LOG_WARNING, LOC +
00524             QString("Poll took an unusually long time %1 ms")
00525             .arg(timer.elapsed()));
00526     }
00527 
00528     return retval;
00529 #endif //!USING_MINGW
00530 }
00531 
00532 bool DeviceReadBuffer::CheckForErrors(
00533     ssize_t len, size_t requested_len, uint &errcnt)
00534 {
00535     if (len > (ssize_t)requested_len)
00536     {
00537         LOG(VB_GENERAL, LOG_ERR, LOC +
00538             "Driver is returning bogus values on read");
00539         if (++errcnt > 5)
00540         {
00541             LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
00542             QMutexLocker locker(&lock);
00543             error = true;
00544         }
00545         return false;
00546     }
00547 
00548 #ifdef USING_MINGW
00549 # ifdef _MSC_VER
00550 #  pragma message( "mingw DeviceReadBuffer::CheckForErrors" )
00551 # else
00552 #  warning mingw DeviceReadBuffer::CheckForErrors
00553 # endif
00554     LOG(VB_GENERAL, LOG_ERR, LOC +
00555         "mingw DeviceReadBuffer::CheckForErrors is not implemented");
00556     return false;
00557 #else
00558     if (len < 0)
00559     {
00560         if (EINTR == errno)
00561             return false;
00562         if (EAGAIN == errno)
00563         {
00564             usleep(2500);
00565             return false;
00566         }
00567         if (EOVERFLOW == errno)
00568         {
00569             LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
00570             return false;
00571         }
00572 
00573         LOG(VB_GENERAL, LOG_ERR, LOC +
00574             QString("Problem reading fd(%1)").arg(_stream_fd) + ENO);
00575 
00576         if (++errcnt > 5)
00577         {
00578             LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
00579             QMutexLocker locker(&lock);
00580             error = true;
00581             return false;
00582         }
00583 
00584         usleep(500);
00585         return false;
00586     }
00587     else if (len == 0)
00588     {
00589         if (++errcnt > 5)
00590         {
00591             LOG(VB_GENERAL, LOG_ERR, LOC +
00592                 QString("End-Of-File? fd(%1)").arg(_stream_fd));
00593 
00594             lock.lock();
00595             eof = true;
00596             lock.unlock();
00597 
00598             return false;
00599         }
00600         usleep(500);
00601         return false;
00602     }
00603     return true;
00604 #endif
00605 }
00606 
00613 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
00614 {
00615     uint avail = WaitForUsed(min(count, (uint)min_read), 500);
00616     size_t cnt = min(count, avail);
00617 
00618     if (!cnt)
00619         return 0;
00620 
00621     if (readPtr + cnt > endPtr)
00622     {
00623         // Process as two pieces
00624         size_t len = endPtr - readPtr;
00625         if (len)
00626         {
00627             memcpy(buf, readPtr, len);
00628             buf += len;
00629             IncrReadPointer(len);
00630         }
00631         if (cnt > len)
00632         {
00633             len = cnt - len;
00634             memcpy(buf, readPtr, len);
00635             IncrReadPointer(len);
00636         }
00637     }
00638     else
00639     {
00640         memcpy(buf, readPtr, cnt);
00641         IncrReadPointer(cnt);
00642     }
00643 
00644 #if REPORT_RING_STATS
00645     ReportStats();
00646 #endif
00647 
00648     return cnt;
00649 }
00650 
00655 uint DeviceReadBuffer::WaitForUnused(uint needed) const
00656 {
00657     size_t unused = GetUnused();
00658 
00659     if (unused > read_quanta)
00660     {
00661         while (unused < needed)
00662         {
00663             unused = GetUnused();
00664             if (IsPauseRequested() || !IsOpen() || !dorun)
00665                 return 0;
00666             usleep(5000);
00667         }
00668         if (IsPauseRequested() || !IsOpen() || !dorun)
00669             return 0;
00670         unused = GetUnused();
00671     }
00672 
00673     return unused;
00674 }
00675 
00681 uint DeviceReadBuffer::WaitForUsed(uint needed, uint max_wait) const
00682 {
00683     MythTimer timer;
00684     timer.start();
00685 
00686     QMutexLocker locker(&lock);
00687     size_t avail = used;
00688     while ((needed > avail) && isRunning() &&
00689            !request_pause && !error && !eof &&
00690            (timer.elapsed() < (int)max_wait))
00691     {
00692         dataWait.wait(locker.mutex(), 10);
00693         avail = used;
00694     }
00695     return avail;
00696 }
00697 
00698 void DeviceReadBuffer::ReportStats(void)
00699 {
00700 #if REPORT_RING_STATS
00701     if (lastReport.elapsed() > 20*1000 /* msg every 20 seconds */)
00702     {
00703         QMutexLocker locker(&lock);
00704         double rsize = 100.0 / size;
00705         QString msg  = QString("fill avg(%1%) ").arg(avg_used*rsize,3,'f',0);
00706         msg         += QString("fill max(%2%) ").arg(max_used*rsize,3,'f',0);
00707         msg         += QString("samples(%3)").arg(avg_cnt);
00708 
00709         avg_used    = 0;
00710         avg_cnt     = 0;
00711         max_used    = 0;
00712         lastReport.start();
00713 
00714         LOG(VB_GENERAL, LOG_DEBUG, LOC + msg);
00715     }
00716 #endif
00717 }
00718 
00719 /*
00720  * vim:ts=4:sw=4:ai:et:si:sts=4
00721  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends