MythTV  0.26-pre
mythsocketthread.cpp
Go to the documentation of this file.
00001 // ANSI C
00002 #include <cstdlib>
00003 
00004 // C++
00005 #include <algorithm> // for min/max
00006 //using namespace std;
00007 
00008 #include "compat.h"
00009 
00010 // POSIX
00011 #ifndef USING_MINGW
00012 #include <sys/select.h> // for select
00013 #endif
00014 #include <sys/types.h>  // for fnctl
00015 #include <fcntl.h>      // for fnctl
00016 #include <errno.h>      // for checking errno
00017 
00018 #ifndef O_NONBLOCK
00019 #define O_NONBLOCK 0 /* not actually supported in MINGW */
00020 #endif
00021 
00022 // Qt
00023 #include <QTime>
00024 
00025 // MythTV
00026 #include "mythsocketthread.h"
00027 #include "mythbaseutil.h"
00028 #include "mythlogging.h"
00029 #include "mythsocket.h"
00030 
00031 #define SLOC(a) QString("MythSocketThread(sock 0x%1:%2): ")\
00032                     .arg((uint64_t)a, 0, 16).arg(a->socket())
00033 #define LOC     QString("MythSocketThread: ")
00034 
00035 const uint MythSocketThread::kShortWait = 100;
00036 
00037 MythSocketThread::MythSocketThread()
00038     : MThread("Socket"), m_readyread_run(false)
00039 {
00040     for (int i = 0; i < 2; i++)
00041     {
00042         m_readyread_pipe[i] = -1;
00043         m_readyread_pipe_flags[i] = 0;
00044     }
00045 }
00046 
00047 void ShutdownRRT(void)
00048 {
00049     QMutexLocker locker(&MythSocket::s_readyread_thread_lock);
00050     if (MythSocket::s_readyread_thread)
00051     {
00052         MythSocket::s_readyread_thread->ShutdownReadyReadThread();
00053         MythSocket::s_readyread_thread->wait();
00054     }
00055 }
00056 
00057 void MythSocketThread::ShutdownReadyReadThread(void)
00058 {
00059     {
00060         QMutexLocker locker(&m_readyread_lock);
00061         m_readyread_run = false;
00062     }
00063 
00064     WakeReadyReadThread();
00065 
00066     wait(); // waits for thread to exit
00067 
00068     CloseReadyReadPipe();
00069 }
00070 
00071 void MythSocketThread::CloseReadyReadPipe(void) const
00072 {
00073     for (uint i = 0; i < 2; i++)
00074     {
00075         if (m_readyread_pipe[i] >= 0)
00076         {
00077             ::close(m_readyread_pipe[i]);
00078             m_readyread_pipe[i] = -1;
00079             m_readyread_pipe_flags[i] = 0;
00080         }
00081     }
00082 }
00083 
00084 void MythSocketThread::StartReadyReadThread(void)
00085 {
00086     QMutexLocker locker(&m_readyread_lock);
00087     if (!m_readyread_run)
00088     {
00089         atexit(ShutdownRRT);
00090         setup_pipe(m_readyread_pipe, m_readyread_pipe_flags);
00091         m_readyread_run = true;
00092         start();
00093         m_readyread_started_wait.wait(&m_readyread_lock);
00094     }
00095 }
00096 
00097 void MythSocketThread::AddToReadyRead(MythSocket *sock)
00098 {
00099     if (sock->socket() == -1)
00100     {
00101         LOG(VB_SOCKET, LOG_ERR, SLOC(sock) +
00102                 "attempted to insert invalid socket to ReadyRead");
00103         return;
00104     }
00105     StartReadyReadThread();
00106 
00107     sock->UpRef();
00108 
00109     {
00110         QMutexLocker locker(&m_readyread_lock);
00111         m_readyread_addlist.push_back(sock);
00112     }
00113 
00114     WakeReadyReadThread();
00115 }
00116 
00117 void MythSocketThread::RemoveFromReadyRead(MythSocket *sock)
00118 {
00119     {
00120         QMutexLocker locker(&m_readyread_lock);
00121         m_readyread_dellist.push_back(sock);
00122     }
00123     WakeReadyReadThread();
00124 }
00125 
00126 void MythSocketThread::WakeReadyReadThread(void) const
00127 {
00128     if (!isRunning())
00129         return;
00130 
00131     QMutexLocker locker(&m_readyread_lock);
00132     m_readyread_wait.wakeAll();
00133 
00134     if (m_readyread_pipe[1] < 0)
00135         return;
00136 
00137     char buf[1] = { '0' };
00138     ssize_t wret = 0;
00139     while (wret <= 0)
00140     {
00141         wret = ::write(m_readyread_pipe[1], &buf, 1);
00142         if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
00143         {
00144             LOG(VB_GENERAL, LOG_ERR, LOC +
00145                 "Failed to write to readyread pipe, closing pipe.");
00146 
00147             // Closing the pipe will cause the run loop's select to exit.
00148             // Then the next time through the loop we should fallback to
00149             // using the code for platforms that don't support pipes..
00150             CloseReadyReadPipe();
00151             break;
00152         }
00153     }
00154 }
00155 
00156 void MythSocketThread::ReadyToBeRead(MythSocket *sock)
00157 {
00158     LOG(VB_SOCKET, LOG_DEBUG, SLOC(sock) + "socket is readable");
00159     int bytesAvail = sock->bytesAvailable();
00160     
00161     if (bytesAvail == 0 && sock->closedByRemote())
00162     {
00163         LOG(VB_SOCKET, LOG_INFO, SLOC(sock) + "socket closed");
00164         sock->close();
00165     }
00166     else if (bytesAvail > 0 && sock->m_cb && sock->m_useReadyReadCallback)
00167     {
00168         sock->m_notifyread = true;
00169         LOG(VB_SOCKET, LOG_DEBUG, SLOC(sock) + "calling m_cb->readyRead()");
00170         sock->m_cb->readyRead(sock);
00171     }
00172 }
00173 
00174 void MythSocketThread::ProcessAddRemoveQueues(void)
00175 {
00176     while (!m_readyread_dellist.empty())
00177     {
00178         MythSocket *sock = m_readyread_dellist.front();
00179         m_readyread_dellist.pop_front();
00180 
00181         if (m_readyread_list.removeAll(sock))
00182             m_readyread_downref_list.push_back(sock);
00183     }
00184 
00185     while (!m_readyread_addlist.empty())
00186     {
00187         MythSocket *sock = m_readyread_addlist.front();
00188         m_readyread_addlist.pop_front();
00189         m_readyread_list.push_back(sock);
00190     }
00191 }
00192 
00193 void MythSocketThread::run(void)
00194 {
00195     RunProlog();
00196     LOG(VB_SOCKET, LOG_DEBUG, LOC + "readyread thread start");
00197 
00198     QMutexLocker locker(&m_readyread_lock);
00199     m_readyread_started_wait.wakeAll();
00200     while (m_readyread_run)
00201     {
00202         LOG(VB_SOCKET, LOG_DEBUG, LOC + "ProcessAddRemoveQueues");
00203 
00204         ProcessAddRemoveQueues();
00205 
00206         LOG(VB_SOCKET, LOG_DEBUG, LOC + "Construct FD_SET");
00207 
00208         // construct FD_SET for all connected and unlocked sockets...
00209         int maxfd = -1;
00210         fd_set rfds;
00211         FD_ZERO(&rfds);
00212 
00213         QList<MythSocket*>::const_iterator it = m_readyread_list.begin();
00214         for (; it != m_readyread_list.end(); ++it)
00215         {
00216             if (!(*it)->TryLock(false))
00217                 continue;
00218 
00219             if ((*it)->state() == MythSocket::Connected &&
00220                 !(*it)->m_notifyread)
00221             {
00222                 FD_SET((*it)->socket(), &rfds);
00223                 maxfd = std::max((*it)->socket(), maxfd);
00224             }
00225             (*it)->Unlock(false);
00226         }
00227 
00228         // There are no unlocked sockets, wait for event before we continue..
00229         if (maxfd < 0)
00230         {
00231             LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, sleeping");
00232             if (m_readyread_wait.wait(&m_readyread_lock))
00233                 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, woken up");
00234             else
00235                 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, timed out");
00236             continue;
00237         }
00238 
00239         int rval = 0;
00240 
00241         if (m_readyread_pipe[0] >= 0)
00242         {
00243             // Clear out any pending pipe reads, we have already taken care of
00244             // this event above under the m_readyread_lock.
00245             char dummy[128];
00246             if (m_readyread_pipe_flags[0] & O_NONBLOCK)
00247             {
00248                 rval = ::read(m_readyread_pipe[0], dummy, 128);
00249                 FD_SET(m_readyread_pipe[0], &rfds);
00250                 maxfd = std::max(m_readyread_pipe[0], maxfd);
00251             }
00252 
00253             // also exit select on exceptions on same descriptors
00254             fd_set efds;
00255             memcpy(&efds, &rfds, sizeof(fd_set));
00256 
00257             // The select waits forever for data, so if we need to process
00258             // anything else we need to write to m_readyread_pipe[1]..
00259             // We unlock the ready read lock, because we don't need it
00260             // and this will allow WakeReadyReadThread() to run..
00261             m_readyread_lock.unlock();
00262             LOG(VB_SOCKET, LOG_DEBUG, LOC + "Waiting on select..");
00263             rval = select(maxfd + 1, &rfds, NULL, &efds, NULL);
00264             LOG(VB_SOCKET, LOG_DEBUG, LOC + "Got data on select");
00265             m_readyread_lock.lock();
00266 
00267             if (rval > 0 && FD_ISSET(m_readyread_pipe[0], &rfds))
00268             {
00269                 int ret = ::read(m_readyread_pipe[0], dummy, 128);
00270                 if (ret < 0)
00271                 {
00272                     LOG(VB_SOCKET, LOG_ERR, LOC +
00273                         "Strange.. failed to read event pipe");
00274                 }
00275             }
00276         }
00277         else
00278         {
00279             LOG(VB_SOCKET, LOG_DEBUG, LOC + "Waiting on select.. (no pipe)");
00280 
00281             fd_set savefds;
00282             memcpy(&savefds, &rfds, sizeof(fd_set));
00283 
00284             // Unfortunately, select on a pipe is not supported on all
00285             // platforms. So we fallback to a loop that instead times out
00286             // of select and checks for wakeAll event.
00287             while (!rval)
00288             {
00289                 // also exit select on exceptions on same descriptors
00290                 fd_set efds;
00291                 memcpy(&efds, &savefds, sizeof(fd_set));
00292 
00293                 struct timeval timeout;
00294                 timeout.tv_sec = 0;
00295                 timeout.tv_usec = kShortWait * 1000;
00296                 rval = select(maxfd + 1, &rfds, NULL, &efds, &timeout);
00297                 if (!rval)
00298                 {
00299                     m_readyread_wait.wait(&m_readyread_lock, kShortWait);
00300                     memcpy(&rfds, &savefds, sizeof(fd_set));
00301                 }
00302             }
00303 
00304             if (rval > 0)
00305                 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Got data on select (no pipe)");
00306         }
00307 
00308         if (rval <= 0)
00309         {
00310             if (rval == 0)
00311             {
00312                 // Note: This should never occur when using pipes. When there
00313                 // is no error there should be data in at least one fd..
00314                 LOG(VB_SOCKET, LOG_DEBUG, LOC + "select timeout");
00315             }
00316             else
00317                 LOG(VB_SOCKET, LOG_ERR, LOC + "select returned error" + ENO);
00318 
00319             m_readyread_wait.wait(&m_readyread_lock, kShortWait);
00320             continue;
00321         }
00322         
00323         // ReadyToBeRead allows calls back into the socket so we need
00324         // to release the lock for a little while.
00325         // since only this loop updates m_readyread_list this is safe.
00326         m_readyread_lock.unlock();
00327 
00328         // Actually read some data! This is a form of co-operative
00329         // multitasking so the ready read handlers should be quick..
00330 
00331         uint downref_tm = 0;
00332         if (!m_readyread_downref_list.empty())
00333         {
00334             LOG(VB_SOCKET, LOG_DEBUG, LOC + "Deleting stale sockets");
00335 
00336             QTime tm = QTime::currentTime();
00337             for (it = m_readyread_downref_list.begin();
00338                  it != m_readyread_downref_list.end(); ++it)
00339             {
00340                 (*it)->DownRef();
00341             }
00342             m_readyread_downref_list.clear();
00343             downref_tm = tm.elapsed();
00344         }
00345 
00346         LOG(VB_SOCKET, LOG_DEBUG, LOC + "Processing ready reads");
00347 
00348         QMap<uint,uint> timers;
00349         QTime tm = QTime::currentTime();
00350         it = m_readyread_list.begin();
00351 
00352         for (; it != m_readyread_list.end() && m_readyread_run; ++it)
00353         {
00354             if (!(*it)->TryLock(false))
00355                 continue;
00356             
00357             int socket = (*it)->socket();
00358 
00359             if (socket >= 0 &&
00360                 (*it)->state() == MythSocket::Connected &&
00361                 FD_ISSET(socket, &rfds))
00362             {
00363                 QTime rrtm = QTime::currentTime();
00364                 ReadyToBeRead(*it);
00365                 timers[socket] = rrtm.elapsed();
00366             }
00367             (*it)->Unlock(false);
00368         }
00369 
00370         if (VERBOSE_LEVEL_CHECK(VB_SOCKET, LOG_DEBUG))
00371         {
00372             QString rep = QString("Total read time: %1ms, on sockets")
00373                 .arg(tm.elapsed());
00374             QMap<uint,uint>::const_iterator it = timers.begin();
00375             for (; it != timers.end(); ++it)
00376                 rep += QString(" {%1,%2ms}").arg(it.key()).arg(*it);
00377             if (downref_tm)
00378                 rep += QString(" {downref, %1ms}").arg(downref_tm);
00379 
00380             LOG(VB_SOCKET, LOG_DEBUG, LOC + rep);
00381         }
00382 
00383         m_readyread_lock.lock();
00384         LOG(VB_SOCKET, LOG_DEBUG, LOC + "Reacquired ready read lock");
00385     }
00386 
00387     LOG(VB_SOCKET, LOG_DEBUG, LOC + "readyread thread exit");
00388     RunEpilog();
00389 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends