MythTV  0.26-pre
ThreadedFileWriter.cpp
Go to the documentation of this file.
00001 // ANSI C headers
00002 #include <cstdio>
00003 #include <cstdlib>
00004 #include <cerrno>
00005 
00006 // Unix C headers
00007 #include <sys/types.h>
00008 #include <sys/stat.h>
00009 #include <unistd.h>
00010 #include <signal.h>
00011 #include <fcntl.h>
00012 #include <string.h>
00013 
00014 // Qt headers
00015 #include <QString>
00016 
00017 // MythTV headers
00018 #include "ThreadedFileWriter.h"
00019 #include "mythlogging.h"
00020 
00021 #include "mythtimer.h"
00022 #include "compat.h"
00023 
00024 #define LOC QString("TFW(%1:%2): ").arg(filename).arg(fd)
00025 
00027 void TFWWriteThread::run(void)
00028 {
00029     RunProlog();
00030     m_parent->DiskLoop();
00031     RunEpilog();
00032 }
00033 
00035 void TFWSyncThread::run(void)
00036 {
00037     RunProlog();
00038     m_parent->SyncLoop();
00039     RunEpilog();
00040 }
00041 
00042 const uint ThreadedFileWriter::kMaxBufferSize = 128 * 1024 * 1024;
00043 const uint ThreadedFileWriter::kMinWriteSize = 64 * 1024;
00044 
00059 ThreadedFileWriter::ThreadedFileWriter(const QString &fname,
00060                                        int pflags, mode_t pmode) :
00061     // file stuff
00062     filename(fname),                     flags(pflags),
00063     mode(pmode),                         fd(-1),
00064     // state
00065     flush(false),                        in_dtor(false),
00066     ignore_writes(false),                tfw_min_write_size(kMinWriteSize),
00067     totalBufferUse(0),
00068     // threads
00069     writeThread(NULL),                   syncThread(NULL)
00070 {
00071     filename.detach();
00072 }
00073 
00080 bool ThreadedFileWriter::ReOpen(QString newFilename)
00081 {
00082     Flush();
00083 
00084     buflock.lock();
00085 
00086     if (fd >= 0)
00087     {
00088         close(fd);
00089         fd = -1;
00090     }
00091 
00092     if (!newFilename.isEmpty())
00093         filename = newFilename;
00094 
00095     buflock.unlock();
00096 
00097     return Open();
00098 }
00099 
00104 bool ThreadedFileWriter::Open(void)
00105 {
00106     ignore_writes = false;
00107 
00108     if (filename == "-")
00109         fd = fileno(stdout);
00110     else
00111     {
00112         QByteArray fname = filename.toLocal8Bit();
00113         fd = open(fname.constData(), flags, mode);
00114     }
00115 
00116     if (fd < 0)
00117     {
00118         LOG(VB_GENERAL, LOG_ERR, LOC +
00119             QString("Opening file '%1'.").arg(filename) + ENO);
00120         return false;
00121     }
00122     else
00123     {
00124         LOG(VB_FILE, LOG_INFO, LOC + "Open() successful");
00125 
00126 #ifdef USING_MINGW
00127         _setmode(fd, _O_BINARY);
00128 #endif
00129         if (!writeThread)
00130         {
00131             writeThread = new TFWWriteThread(this);
00132             writeThread->start();
00133         }
00134 
00135         if (!syncThread)
00136         {
00137             syncThread = new TFWSyncThread(this);
00138             syncThread->start();
00139         }
00140 
00141         return true;
00142     }
00143 }
00144 
00148 ThreadedFileWriter::~ThreadedFileWriter()
00149 {
00150     Flush();
00151 
00152     {  /* tell child threads to exit */
00153         QMutexLocker locker(&buflock);
00154         in_dtor = true;
00155         bufferSyncWait.wakeAll();
00156         bufferHasData.wakeAll();
00157     }
00158 
00159     if (writeThread)
00160     {
00161         writeThread->wait();
00162         delete writeThread;
00163         writeThread = NULL;
00164     }
00165 
00166     while (!writeBuffers.empty())
00167     {
00168         delete writeBuffers.front();
00169         writeBuffers.pop_front();
00170     }
00171 
00172     while (!emptyBuffers.empty())
00173     {
00174         delete emptyBuffers.front();
00175         emptyBuffers.pop_front();
00176     }
00177 
00178     if (syncThread)
00179     {
00180         syncThread->wait();
00181         delete syncThread;
00182         syncThread = NULL;
00183     }
00184 
00185     if (fd >= 0)
00186     {
00187         close(fd);
00188         fd = -1;
00189     }
00190 }
00191 
00198 uint ThreadedFileWriter::Write(const void *data, uint count)
00199 {
00200     if (count == 0)
00201         return 0;
00202 
00203     QMutexLocker locker(&buflock);
00204 
00205     if (ignore_writes)
00206         return count;
00207 
00208     if (totalBufferUse + count > kMaxBufferSize)
00209     {
00210         LOG(VB_GENERAL, LOG_ERR, LOC +
00211                 "Maximum buffer size exceeded."
00212                 "\n\t\t\tfile will be truncated, no further writing "
00213                 "will be done."
00214                 "\n\t\t\tThis generally indicates your disk performance "
00215                 "\n\t\t\tis insufficient to deal with the number of on-going "
00216                 "\n\t\t\trecordings, or you have a disk failure.");
00217         ignore_writes = true;
00218         return count;
00219     }
00220 
00221     TFWBuffer *buf = NULL;
00222 
00223     if (!writeBuffers.empty() &&
00224         (writeBuffers.back()->data.size() + count) < kMinWriteSize)
00225     {
00226         buf = writeBuffers.back();
00227         writeBuffers.pop_back();
00228     }
00229     else
00230     {
00231         if (!emptyBuffers.empty())
00232         {
00233             buf = emptyBuffers.front();
00234             emptyBuffers.pop_front();
00235             buf->data.clear();
00236         }
00237         else
00238         {
00239             buf = new TFWBuffer();
00240         }
00241     }
00242 
00243     totalBufferUse += count;
00244     const char *cdata = (const char*) data;
00245     buf->data.insert(buf->data.end(), cdata, cdata+count);
00246     buf->lastUsed = QDateTime::currentDateTime();
00247 
00248     writeBuffers.push_back(buf);
00249 
00250     bufferHasData.wakeAll();
00251 
00252     LOG(VB_FILE, LOG_DEBUG, LOC + QString("Write(*, %1) total %2 cnt %3")
00253             .arg(count,4).arg(totalBufferUse).arg(writeBuffers.size()));
00254 
00255     return count;
00256 }
00257 
00269 long long ThreadedFileWriter::Seek(long long pos, int whence)
00270 {
00271     QMutexLocker locker(&buflock);
00272     flush = true;
00273     while (!writeBuffers.empty())
00274     {
00275         bufferHasData.wakeAll();
00276         if (!bufferEmpty.wait(locker.mutex(), 2000))
00277         {
00278             LOG(VB_GENERAL, LOG_WARNING, LOC +
00279                 QString("Taking a long time to flush.. buffer size %1")
00280                     .arg(totalBufferUse));
00281         }
00282     }
00283     flush = false;
00284     return lseek(fd, pos, whence);
00285 }
00286 
00290 void ThreadedFileWriter::Flush(void)
00291 {
00292     QMutexLocker locker(&buflock);
00293     flush = true;
00294     while (!writeBuffers.empty())
00295     {
00296         bufferHasData.wakeAll();
00297         if (!bufferEmpty.wait(locker.mutex(), 2000))
00298         {
00299             LOG(VB_GENERAL, LOG_WARNING, LOC +
00300                 QString("Taking a long time to flush.. buffer size %1")
00301                     .arg(totalBufferUse));
00302         }
00303     }
00304     flush = false;
00305 }
00306 
00327 void ThreadedFileWriter::Sync(void)
00328 {
00329     if (fd >= 0)
00330     {
00331 #if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
00332         // fdatasync tries to avoid updating metadata, but will in
00333         // practice always update metadata if any data is written
00334         // as the file will usually have grown.
00335         fdatasync(fd);
00336 #else
00337         fsync(fd);
00338 #endif
00339     }
00340 }
00341 
00346 void ThreadedFileWriter::SetWriteBufferMinWriteSize(uint newMinSize)
00347 {
00348     QMutexLocker locker(&buflock);
00349     if (newMinSize > 0)
00350         tfw_min_write_size = newMinSize;
00351     bufferHasData.wakeAll();
00352 }
00353 
00357 void ThreadedFileWriter::SyncLoop(void)
00358 {
00359     QMutexLocker locker(&buflock);
00360     while (!in_dtor)
00361     {
00362         locker.unlock();
00363 
00364         Sync();
00365 
00366         locker.relock();
00367         bufferSyncWait.wait(&buflock, 1000);
00368     }
00369 }
00370 
00374 void ThreadedFileWriter::DiskLoop(void)
00375 {
00376 #ifndef USING_MINGW
00377     // don't exit program if file gets larger than quota limit..
00378     signal(SIGXFSZ, SIG_IGN);
00379 #endif
00380 
00381     QMutexLocker locker(&buflock);
00382 
00383     // Even if the bytes buffered is less than the minimum write
00384     // size we do want to write to the OS buffers periodically.
00385     // This timer makes sure we do.
00386     MythTimer minWriteTimer;
00387     minWriteTimer.start();
00388 
00389     while (!in_dtor)
00390     {
00391         if (ignore_writes)
00392         {
00393             while (!writeBuffers.empty())
00394             {
00395                 delete writeBuffers.front();
00396                 writeBuffers.pop_front();
00397             }
00398             while (!emptyBuffers.empty())
00399             {
00400                 delete emptyBuffers.front();
00401                 emptyBuffers.pop_front();
00402             }
00403             bufferEmpty.wakeAll();
00404             bufferHasData.wait(locker.mutex());
00405             continue;
00406         }
00407 
00408         if (writeBuffers.empty())
00409         {
00410             bufferEmpty.wakeAll();
00411             bufferHasData.wait(locker.mutex(), 1000);
00412             TrimEmptyBuffers();
00413             continue;
00414         }
00415 
00416         int mwte = minWriteTimer.elapsed();
00417         if (!flush && (mwte < 250) && (totalBufferUse < kMinWriteSize))
00418         {
00419             bufferHasData.wait(locker.mutex(), 250 - mwte);
00420             TrimEmptyBuffers();
00421             continue;
00422         }
00423 
00424         if (fd == -1)
00425         {
00426             bufferHasData.wait(locker.mutex(), 200);
00427             TrimEmptyBuffers();
00428             continue;
00429         }
00430 
00431         TFWBuffer *buf = writeBuffers.front();
00432         writeBuffers.pop_front();
00433         totalBufferUse -= buf->data.size();
00434         minWriteTimer.start();
00435 
00437 
00438         const void *data = &(buf->data[0]);
00439         uint sz = buf->data.size();
00440 
00441         bool write_ok = true;
00442         uint tot = 0;
00443         uint errcnt = 0;
00444 
00445         LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3")
00446                 .arg(sz).arg(writeBuffers.size())
00447                 .arg(totalBufferUse));
00448 
00449         MythTimer writeTimer;
00450         writeTimer.start();
00451 
00452         while ((tot < sz) && !in_dtor)
00453         {
00454             locker.unlock();
00455 
00456             int ret = write(fd, (char *)data + tot, sz - tot);
00457 
00458             if (ret < 0)
00459             {
00460                 if (errno == EAGAIN)
00461                 {
00462                     LOG(VB_GENERAL, LOG_WARNING, LOC + "Got EAGAIN.");
00463                 }
00464                 else
00465                 {
00466                     errcnt++;
00467                     LOG(VB_GENERAL, LOG_ERR, LOC + "File I/O " +
00468                         QString(" errcnt: %1").arg(errcnt) + ENO);
00469                 }
00470 
00471                 if ((errcnt >= 3) || (ENOSPC == errno) || (EFBIG == errno))
00472                 {
00473                     locker.relock();
00474                     write_ok = false;
00475                     break;
00476                 }
00477             }
00478             else
00479             {
00480                 tot += ret;
00481             }
00482 
00483             locker.relock();
00484 
00485             if (!in_dtor)
00486                 bufferHasData.wait(locker.mutex(), 50);
00487         }
00488 
00490 
00491         buf->lastUsed = QDateTime::currentDateTime();
00492         emptyBuffers.push_back(buf);
00493 
00494         if (writeTimer.elapsed() > 1000)
00495         {
00496             LOG(VB_GENERAL, LOG_WARNING, LOC +
00497                 QString("write(%1) cnt %2 total %3 -- took a long time, %4 ms")
00498                     .arg(sz).arg(writeBuffers.size())
00499                     .arg(totalBufferUse).arg(writeTimer.elapsed()));
00500         }
00501 
00502         if (!write_ok && ((EFBIG == errno) || (ENOSPC == errno)))
00503         {
00504             QString msg;
00505             switch (errno)
00506             {
00507                 case EFBIG:
00508                     msg =
00509                         "Maximum file size exceeded by '%1'"
00510                         "\n\t\t\t"
00511                         "You must either change the process ulimits, configure"
00512                         "\n\t\t\t"
00513                         "your operating system with \"Large File\" support, "
00514                         "or use"
00515                         "\n\t\t\t"
00516                         "a filesystem which supports 64-bit or 128-bit files."
00517                         "\n\t\t\t"
00518                         "HINT: FAT32 is a 32-bit filesystem.";
00519                     break;
00520                 case ENOSPC:
00521                     msg =
00522                         "No space left on the device for file '%1'"
00523                         "\n\t\t\t"
00524                         "file will be truncated, no further writing "
00525                         "will be done.";
00526                     break;
00527             }
00528 
00529             LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(filename));
00530             ignore_writes = true;
00531         }
00532     }
00533 }
00534 
00535 void ThreadedFileWriter::TrimEmptyBuffers(void)
00536 {
00537     QDateTime cur = QDateTime::currentDateTime();
00538     QDateTime cur_m_60 = cur.addSecs(-60);
00539 
00540     QList<TFWBuffer*>::iterator it = emptyBuffers.begin();
00541     while (it != emptyBuffers.end())
00542     {
00543         if (((*it)->lastUsed < cur_m_60) ||
00544             ((*it)->data.capacity() > 3 * (*it)->data.size() &&
00545              (*it)->data.capacity() > 64 * 1024))
00546         {
00547             delete *it;
00548             it = emptyBuffers.erase(it);
00549             continue;
00550         }
00551         ++it;
00552     }
00553 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends