|
MythTV
0.26-pre
|
00001 // ANSI C headers 00002 #include <cstdio> 00003 #include <cstdlib> 00004 #include <cerrno> 00005 00006 // Unix C headers 00007 #include <sys/types.h> 00008 #include <sys/stat.h> 00009 #include <unistd.h> 00010 #include <signal.h> 00011 #include <fcntl.h> 00012 #include <string.h> 00013 00014 // Qt headers 00015 #include <QString> 00016 00017 // MythTV headers 00018 #include "ThreadedFileWriter.h" 00019 #include "mythlogging.h" 00020 00021 #include "mythtimer.h" 00022 #include "compat.h" 00023 00024 #define LOC QString("TFW(%1:%2): ").arg(filename).arg(fd) 00025 00027 void TFWWriteThread::run(void) 00028 { 00029 RunProlog(); 00030 m_parent->DiskLoop(); 00031 RunEpilog(); 00032 } 00033 00035 void TFWSyncThread::run(void) 00036 { 00037 RunProlog(); 00038 m_parent->SyncLoop(); 00039 RunEpilog(); 00040 } 00041 00042 const uint ThreadedFileWriter::kMaxBufferSize = 128 * 1024 * 1024; 00043 const uint ThreadedFileWriter::kMinWriteSize = 64 * 1024; 00044 00059 ThreadedFileWriter::ThreadedFileWriter(const QString &fname, 00060 int pflags, mode_t pmode) : 00061 // file stuff 00062 filename(fname), flags(pflags), 00063 mode(pmode), fd(-1), 00064 // state 00065 flush(false), in_dtor(false), 00066 ignore_writes(false), tfw_min_write_size(kMinWriteSize), 00067 totalBufferUse(0), 00068 // threads 00069 writeThread(NULL), syncThread(NULL) 00070 { 00071 filename.detach(); 00072 } 00073 00080 bool ThreadedFileWriter::ReOpen(QString newFilename) 00081 { 00082 Flush(); 00083 00084 buflock.lock(); 00085 00086 if (fd >= 0) 00087 { 00088 close(fd); 00089 fd = -1; 00090 } 00091 00092 if (!newFilename.isEmpty()) 00093 filename = newFilename; 00094 00095 buflock.unlock(); 00096 00097 return Open(); 00098 } 00099 00104 bool ThreadedFileWriter::Open(void) 00105 { 00106 ignore_writes = false; 00107 00108 if (filename == "-") 00109 fd = fileno(stdout); 00110 else 00111 { 00112 QByteArray fname = filename.toLocal8Bit(); 00113 fd = open(fname.constData(), flags, mode); 00114 } 00115 00116 if (fd < 0) 00117 { 00118 LOG(VB_GENERAL, LOG_ERR, LOC + 00119 QString("Opening file '%1'.").arg(filename) + ENO); 00120 return false; 00121 } 00122 else 00123 { 00124 LOG(VB_FILE, LOG_INFO, LOC + "Open() successful"); 00125 00126 #ifdef USING_MINGW 00127 _setmode(fd, _O_BINARY); 00128 #endif 00129 if (!writeThread) 00130 { 00131 writeThread = new TFWWriteThread(this); 00132 writeThread->start(); 00133 } 00134 00135 if (!syncThread) 00136 { 00137 syncThread = new TFWSyncThread(this); 00138 syncThread->start(); 00139 } 00140 00141 return true; 00142 } 00143 } 00144 00148 ThreadedFileWriter::~ThreadedFileWriter() 00149 { 00150 Flush(); 00151 00152 { /* tell child threads to exit */ 00153 QMutexLocker locker(&buflock); 00154 in_dtor = true; 00155 bufferSyncWait.wakeAll(); 00156 bufferHasData.wakeAll(); 00157 } 00158 00159 if (writeThread) 00160 { 00161 writeThread->wait(); 00162 delete writeThread; 00163 writeThread = NULL; 00164 } 00165 00166 while (!writeBuffers.empty()) 00167 { 00168 delete writeBuffers.front(); 00169 writeBuffers.pop_front(); 00170 } 00171 00172 while (!emptyBuffers.empty()) 00173 { 00174 delete emptyBuffers.front(); 00175 emptyBuffers.pop_front(); 00176 } 00177 00178 if (syncThread) 00179 { 00180 syncThread->wait(); 00181 delete syncThread; 00182 syncThread = NULL; 00183 } 00184 00185 if (fd >= 0) 00186 { 00187 close(fd); 00188 fd = -1; 00189 } 00190 } 00191 00198 uint ThreadedFileWriter::Write(const void *data, uint count) 00199 { 00200 if (count == 0) 00201 return 0; 00202 00203 QMutexLocker locker(&buflock); 00204 00205 if (ignore_writes) 00206 return count; 00207 00208 if (totalBufferUse + count > kMaxBufferSize) 00209 { 00210 LOG(VB_GENERAL, LOG_ERR, LOC + 00211 "Maximum buffer size exceeded." 00212 "\n\t\t\tfile will be truncated, no further writing " 00213 "will be done." 00214 "\n\t\t\tThis generally indicates your disk performance " 00215 "\n\t\t\tis insufficient to deal with the number of on-going " 00216 "\n\t\t\trecordings, or you have a disk failure."); 00217 ignore_writes = true; 00218 return count; 00219 } 00220 00221 TFWBuffer *buf = NULL; 00222 00223 if (!writeBuffers.empty() && 00224 (writeBuffers.back()->data.size() + count) < kMinWriteSize) 00225 { 00226 buf = writeBuffers.back(); 00227 writeBuffers.pop_back(); 00228 } 00229 else 00230 { 00231 if (!emptyBuffers.empty()) 00232 { 00233 buf = emptyBuffers.front(); 00234 emptyBuffers.pop_front(); 00235 buf->data.clear(); 00236 } 00237 else 00238 { 00239 buf = new TFWBuffer(); 00240 } 00241 } 00242 00243 totalBufferUse += count; 00244 const char *cdata = (const char*) data; 00245 buf->data.insert(buf->data.end(), cdata, cdata+count); 00246 buf->lastUsed = QDateTime::currentDateTime(); 00247 00248 writeBuffers.push_back(buf); 00249 00250 bufferHasData.wakeAll(); 00251 00252 LOG(VB_FILE, LOG_DEBUG, LOC + QString("Write(*, %1) total %2 cnt %3") 00253 .arg(count,4).arg(totalBufferUse).arg(writeBuffers.size())); 00254 00255 return count; 00256 } 00257 00269 long long ThreadedFileWriter::Seek(long long pos, int whence) 00270 { 00271 QMutexLocker locker(&buflock); 00272 flush = true; 00273 while (!writeBuffers.empty()) 00274 { 00275 bufferHasData.wakeAll(); 00276 if (!bufferEmpty.wait(locker.mutex(), 2000)) 00277 { 00278 LOG(VB_GENERAL, LOG_WARNING, LOC + 00279 QString("Taking a long time to flush.. buffer size %1") 00280 .arg(totalBufferUse)); 00281 } 00282 } 00283 flush = false; 00284 return lseek(fd, pos, whence); 00285 } 00286 00290 void ThreadedFileWriter::Flush(void) 00291 { 00292 QMutexLocker locker(&buflock); 00293 flush = true; 00294 while (!writeBuffers.empty()) 00295 { 00296 bufferHasData.wakeAll(); 00297 if (!bufferEmpty.wait(locker.mutex(), 2000)) 00298 { 00299 LOG(VB_GENERAL, LOG_WARNING, LOC + 00300 QString("Taking a long time to flush.. buffer size %1") 00301 .arg(totalBufferUse)); 00302 } 00303 } 00304 flush = false; 00305 } 00306 00327 void ThreadedFileWriter::Sync(void) 00328 { 00329 if (fd >= 0) 00330 { 00331 #if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0 00332 // fdatasync tries to avoid updating metadata, but will in 00333 // practice always update metadata if any data is written 00334 // as the file will usually have grown. 00335 fdatasync(fd); 00336 #else 00337 fsync(fd); 00338 #endif 00339 } 00340 } 00341 00346 void ThreadedFileWriter::SetWriteBufferMinWriteSize(uint newMinSize) 00347 { 00348 QMutexLocker locker(&buflock); 00349 if (newMinSize > 0) 00350 tfw_min_write_size = newMinSize; 00351 bufferHasData.wakeAll(); 00352 } 00353 00357 void ThreadedFileWriter::SyncLoop(void) 00358 { 00359 QMutexLocker locker(&buflock); 00360 while (!in_dtor) 00361 { 00362 locker.unlock(); 00363 00364 Sync(); 00365 00366 locker.relock(); 00367 bufferSyncWait.wait(&buflock, 1000); 00368 } 00369 } 00370 00374 void ThreadedFileWriter::DiskLoop(void) 00375 { 00376 #ifndef USING_MINGW 00377 // don't exit program if file gets larger than quota limit.. 00378 signal(SIGXFSZ, SIG_IGN); 00379 #endif 00380 00381 QMutexLocker locker(&buflock); 00382 00383 // Even if the bytes buffered is less than the minimum write 00384 // size we do want to write to the OS buffers periodically. 00385 // This timer makes sure we do. 00386 MythTimer minWriteTimer; 00387 minWriteTimer.start(); 00388 00389 while (!in_dtor) 00390 { 00391 if (ignore_writes) 00392 { 00393 while (!writeBuffers.empty()) 00394 { 00395 delete writeBuffers.front(); 00396 writeBuffers.pop_front(); 00397 } 00398 while (!emptyBuffers.empty()) 00399 { 00400 delete emptyBuffers.front(); 00401 emptyBuffers.pop_front(); 00402 } 00403 bufferEmpty.wakeAll(); 00404 bufferHasData.wait(locker.mutex()); 00405 continue; 00406 } 00407 00408 if (writeBuffers.empty()) 00409 { 00410 bufferEmpty.wakeAll(); 00411 bufferHasData.wait(locker.mutex(), 1000); 00412 TrimEmptyBuffers(); 00413 continue; 00414 } 00415 00416 int mwte = minWriteTimer.elapsed(); 00417 if (!flush && (mwte < 250) && (totalBufferUse < kMinWriteSize)) 00418 { 00419 bufferHasData.wait(locker.mutex(), 250 - mwte); 00420 TrimEmptyBuffers(); 00421 continue; 00422 } 00423 00424 if (fd == -1) 00425 { 00426 bufferHasData.wait(locker.mutex(), 200); 00427 TrimEmptyBuffers(); 00428 continue; 00429 } 00430 00431 TFWBuffer *buf = writeBuffers.front(); 00432 writeBuffers.pop_front(); 00433 totalBufferUse -= buf->data.size(); 00434 minWriteTimer.start(); 00435 00437 00438 const void *data = &(buf->data[0]); 00439 uint sz = buf->data.size(); 00440 00441 bool write_ok = true; 00442 uint tot = 0; 00443 uint errcnt = 0; 00444 00445 LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3") 00446 .arg(sz).arg(writeBuffers.size()) 00447 .arg(totalBufferUse)); 00448 00449 MythTimer writeTimer; 00450 writeTimer.start(); 00451 00452 while ((tot < sz) && !in_dtor) 00453 { 00454 locker.unlock(); 00455 00456 int ret = write(fd, (char *)data + tot, sz - tot); 00457 00458 if (ret < 0) 00459 { 00460 if (errno == EAGAIN) 00461 { 00462 LOG(VB_GENERAL, LOG_WARNING, LOC + "Got EAGAIN."); 00463 } 00464 else 00465 { 00466 errcnt++; 00467 LOG(VB_GENERAL, LOG_ERR, LOC + "File I/O " + 00468 QString(" errcnt: %1").arg(errcnt) + ENO); 00469 } 00470 00471 if ((errcnt >= 3) || (ENOSPC == errno) || (EFBIG == errno)) 00472 { 00473 locker.relock(); 00474 write_ok = false; 00475 break; 00476 } 00477 } 00478 else 00479 { 00480 tot += ret; 00481 } 00482 00483 locker.relock(); 00484 00485 if (!in_dtor) 00486 bufferHasData.wait(locker.mutex(), 50); 00487 } 00488 00490 00491 buf->lastUsed = QDateTime::currentDateTime(); 00492 emptyBuffers.push_back(buf); 00493 00494 if (writeTimer.elapsed() > 1000) 00495 { 00496 LOG(VB_GENERAL, LOG_WARNING, LOC + 00497 QString("write(%1) cnt %2 total %3 -- took a long time, %4 ms") 00498 .arg(sz).arg(writeBuffers.size()) 00499 .arg(totalBufferUse).arg(writeTimer.elapsed())); 00500 } 00501 00502 if (!write_ok && ((EFBIG == errno) || (ENOSPC == errno))) 00503 { 00504 QString msg; 00505 switch (errno) 00506 { 00507 case EFBIG: 00508 msg = 00509 "Maximum file size exceeded by '%1'" 00510 "\n\t\t\t" 00511 "You must either change the process ulimits, configure" 00512 "\n\t\t\t" 00513 "your operating system with \"Large File\" support, " 00514 "or use" 00515 "\n\t\t\t" 00516 "a filesystem which supports 64-bit or 128-bit files." 00517 "\n\t\t\t" 00518 "HINT: FAT32 is a 32-bit filesystem."; 00519 break; 00520 case ENOSPC: 00521 msg = 00522 "No space left on the device for file '%1'" 00523 "\n\t\t\t" 00524 "file will be truncated, no further writing " 00525 "will be done."; 00526 break; 00527 } 00528 00529 LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(filename)); 00530 ignore_writes = true; 00531 } 00532 } 00533 } 00534 00535 void ThreadedFileWriter::TrimEmptyBuffers(void) 00536 { 00537 QDateTime cur = QDateTime::currentDateTime(); 00538 QDateTime cur_m_60 = cur.addSecs(-60); 00539 00540 QList<TFWBuffer*>::iterator it = emptyBuffers.begin(); 00541 while (it != emptyBuffers.end()) 00542 { 00543 if (((*it)->lastUsed < cur_m_60) || 00544 ((*it)->data.capacity() > 3 * (*it)->data.size() && 00545 (*it)->data.capacity() > 64 * 1024)) 00546 { 00547 delete *it; 00548 it = emptyBuffers.erase(it); 00549 continue; 00550 } 00551 ++it; 00552 } 00553 }
1.7.6.1