|
MythTV
0.26-pre
|
00001 #include <cstdio> 00002 #include <cstdlib> 00003 #include <unistd.h> 00004 #include <fcntl.h> 00005 #include <cassert> 00006 #include <cerrno> 00007 #include <sys/time.h> 00008 #include <sys/types.h> 00009 #include <sys/stat.h> 00010 #include <ctime> 00011 #include <cmath> 00012 00013 #include "fifowriter.h" 00014 #include "compat.h" 00015 #include "mythlogging.h" 00016 00017 #include "mythconfig.h" 00018 #if CONFIG_DARWIN 00019 #include <sys/aio.h> // O_SYNC 00020 #endif 00021 00022 #include <iostream> 00023 using namespace std; 00024 00025 FIFOWriter::FIFOWriter(int count, bool sync) : 00026 fifo_buf(NULL), 00027 fb_inptr(NULL), 00028 fb_outptr(NULL), 00029 fifothrds(NULL), 00030 fifo_lock(NULL), 00031 full_cond(NULL), 00032 empty_cond(NULL), 00033 filename(NULL), 00034 fbdesc(NULL), 00035 maxblksize(NULL), 00036 killwr(NULL), 00037 fbcount(NULL), 00038 num_fifos(count), 00039 usesync(sync) 00040 { 00041 if (count <= 0) 00042 return; 00043 00044 fifo_buf = new struct fifo_buf *[count]; 00045 fb_inptr = new struct fifo_buf *[count]; 00046 fb_outptr = new struct fifo_buf *[count]; 00047 fifothrds = new FIFOThread[count]; 00048 fifo_lock = new QMutex[count]; 00049 full_cond = new QWaitCondition[count]; 00050 empty_cond = new QWaitCondition[count]; 00051 filename = new QString [count]; 00052 fbdesc = new QString [count]; 00053 maxblksize = new long[count]; 00054 killwr = new int[count]; 00055 fbcount = new int[count]; 00056 } 00057 00058 FIFOWriter::~FIFOWriter() 00059 { 00060 if (num_fifos <= 0) 00061 return; 00062 00063 for (int i = 0; i < num_fifos; i++) 00064 { 00065 QMutexLocker flock(&fifo_lock[i]); 00066 killwr[i] = 1; 00067 empty_cond[i].wakeAll(); 00068 } 00069 00070 for (int i = 0; i < num_fifos; i++) 00071 { 00072 fifothrds[i].wait(); 00073 } 00074 00075 num_fifos = 0; 00076 00077 delete [] maxblksize; 00078 delete [] fifo_buf; 00079 delete [] fb_inptr; 00080 delete [] fb_outptr; 00081 delete [] fifothrds; 00082 delete [] full_cond; 00083 delete [] empty_cond; 00084 delete [] fifo_lock; 00085 delete [] filename; 00086 delete [] fbdesc; 00087 delete [] killwr; 00088 delete [] fbcount; 00089 } 00090 00091 int FIFOWriter::FIFOInit(int id, QString desc, QString name, long size, 00092 int num_bufs) 00093 { 00094 if (id < 0 || id >= num_fifos) 00095 return false; 00096 00097 QByteArray fname = name.toAscii(); 00098 const char *aname = fname.constData(); 00099 if (mkfifo(aname, S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1) 00100 { 00101 LOG(VB_GENERAL, LOG_ERR, QString("Couldn't create fifo for file: '%1'") 00102 .arg(name) + ENO); 00103 return false; 00104 } 00105 LOG(VB_GENERAL, LOG_INFO, QString("Created %1 fifo: %2") 00106 .arg(desc).arg(name)); 00107 maxblksize[id] = size; 00108 filename[id] = name; 00109 fbdesc[id] = desc; 00110 killwr[id] = 0; 00111 fbcount[id] = (usesync) ? 2 : num_bufs; 00112 fifo_buf[id] = new struct fifo_buf; 00113 struct fifo_buf *fifoptr = fifo_buf[id]; 00114 for (int i = 0; i < fbcount[id]; i++) 00115 { 00116 fifoptr->data = new unsigned char[maxblksize[id]]; 00117 if (i == fbcount[id] - 1) 00118 fifoptr->next = fifo_buf[id]; 00119 else 00120 fifoptr->next = new struct fifo_buf; 00121 fifoptr = fifoptr->next; 00122 } 00123 fb_inptr[id] = fifo_buf[id]; 00124 fb_outptr[id] = fifo_buf[id]; 00125 00126 fifothrds[id].SetParent(this); 00127 fifothrds[id].SetId(id); 00128 fifothrds[id].start(); 00129 00130 while (0 == killwr[id] && !fifothrds[id].isRunning()) 00131 usleep(1000); 00132 00133 return fifothrds[id].isRunning(); 00134 } 00135 00136 void FIFOThread::run(void) 00137 { 00138 RunProlog(); 00139 if (m_parent && m_id != -1) 00140 m_parent->FIFOWriteThread(m_id); 00141 RunEpilog(); 00142 } 00143 00144 void FIFOWriter::FIFOWriteThread(int id) 00145 { 00146 int fd = -1; 00147 00148 QMutexLocker flock(&fifo_lock[id]); 00149 while (1) 00150 { 00151 if ((fb_inptr[id] == fb_outptr[id]) && (0 == killwr[id])) 00152 empty_cond[id].wait(flock.mutex()); 00153 flock.unlock(); 00154 if (killwr[id]) 00155 break; 00156 if (fd < 0) 00157 { 00158 QByteArray fname = filename[id].toAscii(); 00159 fd = open(fname.constData(), O_WRONLY| O_SYNC); 00160 } 00161 if (fd >= 0) 00162 { 00163 int written = 0; 00164 while (written < fb_outptr[id]->blksize) 00165 { 00166 int ret = write(fd, fb_outptr[id]->data+written, 00167 fb_outptr[id]->blksize-written); 00168 if (ret < 0) 00169 { 00170 LOG(VB_GENERAL, LOG_ERR, 00171 QString("FIFOW: write failed with %1") 00172 .arg(strerror(errno))); 00174 break; 00175 } 00176 else 00177 { 00178 written += ret; 00179 } 00180 } 00181 } 00182 flock.relock(); 00183 fb_outptr[id] = fb_outptr[id]->next; 00184 full_cond[id].wakeAll(); 00185 } 00186 00187 if (fd != -1) 00188 close(fd); 00189 00190 unlink(filename[id].toLocal8Bit().constData()); 00191 00192 while (fifo_buf[id]->next != fifo_buf[id]) 00193 { 00194 struct fifo_buf *tmpfifo = fifo_buf[id]->next->next; 00195 delete [] fifo_buf[id]->next->data; 00196 delete fifo_buf[id]->next; 00197 fifo_buf[id]->next = tmpfifo; 00198 } 00199 delete [] fifo_buf[id]->data; 00200 delete fifo_buf[id]; 00201 } 00202 00203 void FIFOWriter::FIFOWrite(int id, void *buffer, long blksize) 00204 { 00205 QMutexLocker flock(&fifo_lock[id]); 00206 while (fb_inptr[id]->next == fb_outptr[id]) 00207 { 00208 bool blocking = false; 00209 if (!usesync) 00210 { 00211 for(int i = 0; i < num_fifos; i++) 00212 { 00213 if (i == id) 00214 continue; 00215 if (fb_inptr[i] == fb_outptr[i]) 00216 blocking = true; 00217 } 00218 } 00219 00220 if (blocking) 00221 { 00222 struct fifo_buf *tmpfifo; 00223 tmpfifo = fb_inptr[id]->next; 00224 fb_inptr[id]->next = new struct fifo_buf; 00225 fb_inptr[id]->next->data = new unsigned char[maxblksize[id]]; 00226 fb_inptr[id]->next->next = tmpfifo; 00227 QString msg = QString("allocating additonal buffer for : %1(%2)") 00228 .arg(fbdesc[id]).arg(++fbcount[id]); 00229 LOG(VB_FILE, LOG_INFO, msg); 00230 } 00231 else 00232 { 00233 full_cond[id].wait(flock.mutex(), 1000); 00234 } 00235 } 00236 if (blksize > maxblksize[id]) 00237 { 00238 delete [] fb_inptr[id]->data; 00239 fb_inptr[id]->data = new unsigned char[blksize]; 00240 } 00241 memcpy(fb_inptr[id]->data,buffer,blksize); 00242 fb_inptr[id]->blksize = blksize; 00243 fb_inptr[id] = fb_inptr[id]->next; 00244 empty_cond[id].wakeAll(); 00245 } 00246 00247 void FIFOWriter::FIFODrain(void) 00248 { 00249 int count = 0; 00250 while (count < num_fifos) 00251 { 00252 count = 0; 00253 for (int i = 0; i < num_fifos; i++) 00254 { 00255 QMutexLocker flock(&fifo_lock[i]); 00256 if (fb_inptr[i] == fb_outptr[i]) 00257 { 00258 killwr[i] = 1; 00259 empty_cond[i].wakeAll(); 00260 count++; 00261 } 00262 } 00263 usleep(1000); 00264 } 00265 }
1.7.6.1