MythTV  0.26-pre
fifowriter.cpp
Go to the documentation of this file.
00001 #include <cstdio>
00002 #include <cstdlib>
00003 #include <unistd.h>
00004 #include <fcntl.h>
00005 #include <cassert>
00006 #include <cerrno>
00007 #include <sys/time.h>
00008 #include <sys/types.h>
00009 #include <sys/stat.h>
00010 #include <ctime>
00011 #include <cmath>
00012 
00013 #include "fifowriter.h"
00014 #include "compat.h"
00015 #include "mythlogging.h"
00016 
00017 #include "mythconfig.h"
00018 #if CONFIG_DARWIN
00019     #include <sys/aio.h>    // O_SYNC
00020 #endif
00021 
00022 #include <iostream>
00023 using namespace std;
00024 
00025 FIFOWriter::FIFOWriter(int count, bool sync) :
00026     fifo_buf(NULL),
00027     fb_inptr(NULL),
00028     fb_outptr(NULL),
00029     fifothrds(NULL),
00030     fifo_lock(NULL),
00031     full_cond(NULL),
00032     empty_cond(NULL),
00033     filename(NULL),
00034     fbdesc(NULL),
00035     maxblksize(NULL),
00036     killwr(NULL),
00037     fbcount(NULL),
00038     num_fifos(count),
00039     usesync(sync)
00040 {
00041     if (count <= 0)
00042         return;
00043 
00044     fifo_buf = new struct fifo_buf *[count];
00045     fb_inptr = new struct fifo_buf *[count];
00046     fb_outptr = new struct fifo_buf *[count];
00047     fifothrds = new FIFOThread[count];
00048     fifo_lock = new QMutex[count];
00049     full_cond = new QWaitCondition[count];
00050     empty_cond = new QWaitCondition[count];
00051     filename = new QString [count];
00052     fbdesc = new QString [count];
00053     maxblksize = new long[count];
00054     killwr = new int[count];
00055     fbcount = new int[count];
00056 }
00057 
00058 FIFOWriter::~FIFOWriter()
00059 {
00060     if (num_fifos <= 0)
00061         return;
00062 
00063     for (int i = 0; i < num_fifos; i++)
00064     {
00065         QMutexLocker flock(&fifo_lock[i]);
00066         killwr[i] = 1;
00067         empty_cond[i].wakeAll();
00068     }
00069 
00070     for (int i = 0; i < num_fifos; i++)
00071     {
00072         fifothrds[i].wait();
00073     }
00074 
00075     num_fifos = 0;
00076 
00077     delete [] maxblksize;
00078     delete [] fifo_buf;
00079     delete [] fb_inptr;
00080     delete [] fb_outptr;
00081     delete [] fifothrds;
00082     delete [] full_cond;
00083     delete [] empty_cond;
00084     delete [] fifo_lock;
00085     delete [] filename;
00086     delete [] fbdesc;
00087     delete [] killwr;
00088     delete [] fbcount;
00089 }
00090 
00091 int FIFOWriter::FIFOInit(int id, QString desc, QString name, long size,
00092                          int num_bufs)
00093 {
00094     if (id < 0 || id >= num_fifos)
00095         return false;
00096 
00097     QByteArray  fname = name.toAscii();
00098     const char *aname = fname.constData();
00099     if (mkfifo(aname, S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1)
00100     {
00101         LOG(VB_GENERAL, LOG_ERR, QString("Couldn't create fifo for file: '%1'")
00102                 .arg(name) + ENO);
00103         return false;
00104     }
00105     LOG(VB_GENERAL, LOG_INFO, QString("Created %1 fifo: %2")
00106             .arg(desc).arg(name));
00107     maxblksize[id] = size;
00108     filename[id] = name;
00109     fbdesc[id] = desc;
00110     killwr[id] = 0;
00111     fbcount[id] = (usesync) ? 2 : num_bufs;
00112     fifo_buf[id] = new struct fifo_buf;
00113     struct fifo_buf *fifoptr = fifo_buf[id];
00114     for (int i = 0; i < fbcount[id]; i++)
00115     {
00116         fifoptr->data = new unsigned char[maxblksize[id]];
00117         if (i == fbcount[id] - 1)
00118             fifoptr->next = fifo_buf[id];
00119         else
00120             fifoptr->next = new struct fifo_buf;
00121         fifoptr = fifoptr->next;
00122     }
00123     fb_inptr[id]  = fifo_buf[id];
00124     fb_outptr[id] = fifo_buf[id];
00125 
00126     fifothrds[id].SetParent(this);
00127     fifothrds[id].SetId(id);
00128     fifothrds[id].start();
00129 
00130     while (0 == killwr[id] && !fifothrds[id].isRunning())
00131         usleep(1000);
00132 
00133     return fifothrds[id].isRunning();
00134 }
00135 
00136 void FIFOThread::run(void)
00137 {
00138     RunProlog();
00139     if (m_parent && m_id != -1)
00140         m_parent->FIFOWriteThread(m_id);
00141     RunEpilog();
00142 }
00143 
00144 void FIFOWriter::FIFOWriteThread(int id)
00145 {
00146     int fd = -1;
00147 
00148     QMutexLocker flock(&fifo_lock[id]);
00149     while (1)
00150     {
00151         if ((fb_inptr[id] == fb_outptr[id]) && (0 == killwr[id]))
00152             empty_cond[id].wait(flock.mutex());
00153         flock.unlock();
00154         if (killwr[id])
00155             break;
00156         if (fd < 0)
00157         {
00158             QByteArray fname = filename[id].toAscii();
00159             fd = open(fname.constData(), O_WRONLY| O_SYNC);
00160         }
00161         if (fd >= 0)
00162         {
00163             int written = 0;
00164             while (written < fb_outptr[id]->blksize)
00165             {
00166                 int ret = write(fd, fb_outptr[id]->data+written,
00167                                 fb_outptr[id]->blksize-written);
00168                 if (ret < 0)
00169                 {
00170                     LOG(VB_GENERAL, LOG_ERR,
00171                         QString("FIFOW: write failed with %1")
00172                             .arg(strerror(errno)));
00174                     break;
00175                 }
00176                 else
00177                 {
00178                     written += ret;
00179                 }
00180             }
00181         }
00182         flock.relock();
00183         fb_outptr[id] = fb_outptr[id]->next;
00184         full_cond[id].wakeAll();
00185     }
00186 
00187     if (fd != -1)
00188         close(fd);
00189 
00190     unlink(filename[id].toLocal8Bit().constData());
00191 
00192     while (fifo_buf[id]->next != fifo_buf[id])
00193     {
00194         struct fifo_buf *tmpfifo = fifo_buf[id]->next->next;
00195         delete [] fifo_buf[id]->next->data;
00196         delete fifo_buf[id]->next;
00197         fifo_buf[id]->next = tmpfifo;
00198     }
00199     delete [] fifo_buf[id]->data;
00200     delete fifo_buf[id];
00201 }
00202 
00203 void FIFOWriter::FIFOWrite(int id, void *buffer, long blksize)
00204 {
00205     QMutexLocker flock(&fifo_lock[id]);
00206     while (fb_inptr[id]->next == fb_outptr[id])
00207     {
00208         bool blocking = false;
00209         if (!usesync)
00210         {
00211             for(int i = 0; i < num_fifos; i++)
00212             {
00213                 if (i == id)
00214                     continue;
00215                 if (fb_inptr[i] == fb_outptr[i])
00216                     blocking = true;
00217             }
00218         }
00219 
00220         if (blocking)
00221         {
00222             struct fifo_buf *tmpfifo;
00223             tmpfifo = fb_inptr[id]->next;
00224             fb_inptr[id]->next = new struct fifo_buf;
00225             fb_inptr[id]->next->data = new unsigned char[maxblksize[id]];
00226             fb_inptr[id]->next->next = tmpfifo;
00227             QString msg = QString("allocating additonal buffer for : %1(%2)")
00228                           .arg(fbdesc[id]).arg(++fbcount[id]);
00229             LOG(VB_FILE, LOG_INFO, msg);
00230         }
00231         else
00232         {
00233             full_cond[id].wait(flock.mutex(), 1000);
00234         }
00235     }
00236     if (blksize > maxblksize[id])
00237     {
00238         delete [] fb_inptr[id]->data;
00239         fb_inptr[id]->data = new unsigned char[blksize];
00240     }
00241     memcpy(fb_inptr[id]->data,buffer,blksize);
00242     fb_inptr[id]->blksize = blksize;
00243     fb_inptr[id] = fb_inptr[id]->next;
00244     empty_cond[id].wakeAll();
00245 }
00246 
00247 void FIFOWriter::FIFODrain(void)
00248 {
00249     int count = 0;
00250     while (count < num_fifos)
00251     {
00252         count = 0;
00253         for (int i = 0; i < num_fifos; i++)
00254         {
00255             QMutexLocker flock(&fifo_lock[i]);
00256             if (fb_inptr[i] == fb_outptr[i])
00257             {
00258                 killwr[i] = 1;
00259                 empty_cond[i].wakeAll();
00260                 count++;
00261             }
00262         }
00263         usleep(1000);
00264     }
00265 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends