|
MythTV
0.26-pre
|
00001 // ANSI C 00002 #include <cstdlib> 00003 00004 // C++ 00005 #include <algorithm> // for min/max 00006 //using namespace std; 00007 00008 #include "compat.h" 00009 00010 // POSIX 00011 #ifndef USING_MINGW 00012 #include <sys/select.h> // for select 00013 #endif 00014 #include <sys/types.h> // for fnctl 00015 #include <fcntl.h> // for fnctl 00016 #include <errno.h> // for checking errno 00017 00018 #ifndef O_NONBLOCK 00019 #define O_NONBLOCK 0 /* not actually supported in MINGW */ 00020 #endif 00021 00022 // Qt 00023 #include <QTime> 00024 00025 // MythTV 00026 #include "mythsocketthread.h" 00027 #include "mythbaseutil.h" 00028 #include "mythlogging.h" 00029 #include "mythsocket.h" 00030 00031 #define SLOC(a) QString("MythSocketThread(sock 0x%1:%2): ")\ 00032 .arg((uint64_t)a, 0, 16).arg(a->socket()) 00033 #define LOC QString("MythSocketThread: ") 00034 00035 const uint MythSocketThread::kShortWait = 100; 00036 00037 MythSocketThread::MythSocketThread() 00038 : MThread("Socket"), m_readyread_run(false) 00039 { 00040 for (int i = 0; i < 2; i++) 00041 { 00042 m_readyread_pipe[i] = -1; 00043 m_readyread_pipe_flags[i] = 0; 00044 } 00045 } 00046 00047 void ShutdownRRT(void) 00048 { 00049 QMutexLocker locker(&MythSocket::s_readyread_thread_lock); 00050 if (MythSocket::s_readyread_thread) 00051 { 00052 MythSocket::s_readyread_thread->ShutdownReadyReadThread(); 00053 MythSocket::s_readyread_thread->wait(); 00054 } 00055 } 00056 00057 void MythSocketThread::ShutdownReadyReadThread(void) 00058 { 00059 { 00060 QMutexLocker locker(&m_readyread_lock); 00061 m_readyread_run = false; 00062 } 00063 00064 WakeReadyReadThread(); 00065 00066 wait(); // waits for thread to exit 00067 00068 CloseReadyReadPipe(); 00069 } 00070 00071 void MythSocketThread::CloseReadyReadPipe(void) const 00072 { 00073 for (uint i = 0; i < 2; i++) 00074 { 00075 if (m_readyread_pipe[i] >= 0) 00076 { 00077 ::close(m_readyread_pipe[i]); 00078 m_readyread_pipe[i] = -1; 00079 m_readyread_pipe_flags[i] = 0; 00080 } 00081 } 00082 } 00083 00084 void MythSocketThread::StartReadyReadThread(void) 00085 { 00086 QMutexLocker locker(&m_readyread_lock); 00087 if (!m_readyread_run) 00088 { 00089 atexit(ShutdownRRT); 00090 setup_pipe(m_readyread_pipe, m_readyread_pipe_flags); 00091 m_readyread_run = true; 00092 start(); 00093 m_readyread_started_wait.wait(&m_readyread_lock); 00094 } 00095 } 00096 00097 void MythSocketThread::AddToReadyRead(MythSocket *sock) 00098 { 00099 if (sock->socket() == -1) 00100 { 00101 LOG(VB_SOCKET, LOG_ERR, SLOC(sock) + 00102 "attempted to insert invalid socket to ReadyRead"); 00103 return; 00104 } 00105 StartReadyReadThread(); 00106 00107 sock->UpRef(); 00108 00109 { 00110 QMutexLocker locker(&m_readyread_lock); 00111 m_readyread_addlist.push_back(sock); 00112 } 00113 00114 WakeReadyReadThread(); 00115 } 00116 00117 void MythSocketThread::RemoveFromReadyRead(MythSocket *sock) 00118 { 00119 { 00120 QMutexLocker locker(&m_readyread_lock); 00121 m_readyread_dellist.push_back(sock); 00122 } 00123 WakeReadyReadThread(); 00124 } 00125 00126 void MythSocketThread::WakeReadyReadThread(void) const 00127 { 00128 if (!isRunning()) 00129 return; 00130 00131 QMutexLocker locker(&m_readyread_lock); 00132 m_readyread_wait.wakeAll(); 00133 00134 if (m_readyread_pipe[1] < 0) 00135 return; 00136 00137 char buf[1] = { '0' }; 00138 ssize_t wret = 0; 00139 while (wret <= 0) 00140 { 00141 wret = ::write(m_readyread_pipe[1], &buf, 1); 00142 if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno)) 00143 { 00144 LOG(VB_GENERAL, LOG_ERR, LOC + 00145 "Failed to write to readyread pipe, closing pipe."); 00146 00147 // Closing the pipe will cause the run loop's select to exit. 00148 // Then the next time through the loop we should fallback to 00149 // using the code for platforms that don't support pipes.. 00150 CloseReadyReadPipe(); 00151 break; 00152 } 00153 } 00154 } 00155 00156 void MythSocketThread::ReadyToBeRead(MythSocket *sock) 00157 { 00158 LOG(VB_SOCKET, LOG_DEBUG, SLOC(sock) + "socket is readable"); 00159 int bytesAvail = sock->bytesAvailable(); 00160 00161 if (bytesAvail == 0 && sock->closedByRemote()) 00162 { 00163 LOG(VB_SOCKET, LOG_INFO, SLOC(sock) + "socket closed"); 00164 sock->close(); 00165 } 00166 else if (bytesAvail > 0 && sock->m_cb && sock->m_useReadyReadCallback) 00167 { 00168 sock->m_notifyread = true; 00169 LOG(VB_SOCKET, LOG_DEBUG, SLOC(sock) + "calling m_cb->readyRead()"); 00170 sock->m_cb->readyRead(sock); 00171 } 00172 } 00173 00174 void MythSocketThread::ProcessAddRemoveQueues(void) 00175 { 00176 while (!m_readyread_dellist.empty()) 00177 { 00178 MythSocket *sock = m_readyread_dellist.front(); 00179 m_readyread_dellist.pop_front(); 00180 00181 if (m_readyread_list.removeAll(sock)) 00182 m_readyread_downref_list.push_back(sock); 00183 } 00184 00185 while (!m_readyread_addlist.empty()) 00186 { 00187 MythSocket *sock = m_readyread_addlist.front(); 00188 m_readyread_addlist.pop_front(); 00189 m_readyread_list.push_back(sock); 00190 } 00191 } 00192 00193 void MythSocketThread::run(void) 00194 { 00195 RunProlog(); 00196 LOG(VB_SOCKET, LOG_DEBUG, LOC + "readyread thread start"); 00197 00198 QMutexLocker locker(&m_readyread_lock); 00199 m_readyread_started_wait.wakeAll(); 00200 while (m_readyread_run) 00201 { 00202 LOG(VB_SOCKET, LOG_DEBUG, LOC + "ProcessAddRemoveQueues"); 00203 00204 ProcessAddRemoveQueues(); 00205 00206 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Construct FD_SET"); 00207 00208 // construct FD_SET for all connected and unlocked sockets... 00209 int maxfd = -1; 00210 fd_set rfds; 00211 FD_ZERO(&rfds); 00212 00213 QList<MythSocket*>::const_iterator it = m_readyread_list.begin(); 00214 for (; it != m_readyread_list.end(); ++it) 00215 { 00216 if (!(*it)->TryLock(false)) 00217 continue; 00218 00219 if ((*it)->state() == MythSocket::Connected && 00220 !(*it)->m_notifyread) 00221 { 00222 FD_SET((*it)->socket(), &rfds); 00223 maxfd = std::max((*it)->socket(), maxfd); 00224 } 00225 (*it)->Unlock(false); 00226 } 00227 00228 // There are no unlocked sockets, wait for event before we continue.. 00229 if (maxfd < 0) 00230 { 00231 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, sleeping"); 00232 if (m_readyread_wait.wait(&m_readyread_lock)) 00233 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, woken up"); 00234 else 00235 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, timed out"); 00236 continue; 00237 } 00238 00239 int rval = 0; 00240 00241 if (m_readyread_pipe[0] >= 0) 00242 { 00243 // Clear out any pending pipe reads, we have already taken care of 00244 // this event above under the m_readyread_lock. 00245 char dummy[128]; 00246 if (m_readyread_pipe_flags[0] & O_NONBLOCK) 00247 { 00248 rval = ::read(m_readyread_pipe[0], dummy, 128); 00249 FD_SET(m_readyread_pipe[0], &rfds); 00250 maxfd = std::max(m_readyread_pipe[0], maxfd); 00251 } 00252 00253 // also exit select on exceptions on same descriptors 00254 fd_set efds; 00255 memcpy(&efds, &rfds, sizeof(fd_set)); 00256 00257 // The select waits forever for data, so if we need to process 00258 // anything else we need to write to m_readyread_pipe[1].. 00259 // We unlock the ready read lock, because we don't need it 00260 // and this will allow WakeReadyReadThread() to run.. 00261 m_readyread_lock.unlock(); 00262 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Waiting on select.."); 00263 rval = select(maxfd + 1, &rfds, NULL, &efds, NULL); 00264 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Got data on select"); 00265 m_readyread_lock.lock(); 00266 00267 if (rval > 0 && FD_ISSET(m_readyread_pipe[0], &rfds)) 00268 { 00269 int ret = ::read(m_readyread_pipe[0], dummy, 128); 00270 if (ret < 0) 00271 { 00272 LOG(VB_SOCKET, LOG_ERR, LOC + 00273 "Strange.. failed to read event pipe"); 00274 } 00275 } 00276 } 00277 else 00278 { 00279 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Waiting on select.. (no pipe)"); 00280 00281 fd_set savefds; 00282 memcpy(&savefds, &rfds, sizeof(fd_set)); 00283 00284 // Unfortunately, select on a pipe is not supported on all 00285 // platforms. So we fallback to a loop that instead times out 00286 // of select and checks for wakeAll event. 00287 while (!rval) 00288 { 00289 // also exit select on exceptions on same descriptors 00290 fd_set efds; 00291 memcpy(&efds, &savefds, sizeof(fd_set)); 00292 00293 struct timeval timeout; 00294 timeout.tv_sec = 0; 00295 timeout.tv_usec = kShortWait * 1000; 00296 rval = select(maxfd + 1, &rfds, NULL, &efds, &timeout); 00297 if (!rval) 00298 { 00299 m_readyread_wait.wait(&m_readyread_lock, kShortWait); 00300 memcpy(&rfds, &savefds, sizeof(fd_set)); 00301 } 00302 } 00303 00304 if (rval > 0) 00305 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Got data on select (no pipe)"); 00306 } 00307 00308 if (rval <= 0) 00309 { 00310 if (rval == 0) 00311 { 00312 // Note: This should never occur when using pipes. When there 00313 // is no error there should be data in at least one fd.. 00314 LOG(VB_SOCKET, LOG_DEBUG, LOC + "select timeout"); 00315 } 00316 else 00317 LOG(VB_SOCKET, LOG_ERR, LOC + "select returned error" + ENO); 00318 00319 m_readyread_wait.wait(&m_readyread_lock, kShortWait); 00320 continue; 00321 } 00322 00323 // ReadyToBeRead allows calls back into the socket so we need 00324 // to release the lock for a little while. 00325 // since only this loop updates m_readyread_list this is safe. 00326 m_readyread_lock.unlock(); 00327 00328 // Actually read some data! This is a form of co-operative 00329 // multitasking so the ready read handlers should be quick.. 00330 00331 uint downref_tm = 0; 00332 if (!m_readyread_downref_list.empty()) 00333 { 00334 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Deleting stale sockets"); 00335 00336 QTime tm = QTime::currentTime(); 00337 for (it = m_readyread_downref_list.begin(); 00338 it != m_readyread_downref_list.end(); ++it) 00339 { 00340 (*it)->DownRef(); 00341 } 00342 m_readyread_downref_list.clear(); 00343 downref_tm = tm.elapsed(); 00344 } 00345 00346 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Processing ready reads"); 00347 00348 QMap<uint,uint> timers; 00349 QTime tm = QTime::currentTime(); 00350 it = m_readyread_list.begin(); 00351 00352 for (; it != m_readyread_list.end() && m_readyread_run; ++it) 00353 { 00354 if (!(*it)->TryLock(false)) 00355 continue; 00356 00357 int socket = (*it)->socket(); 00358 00359 if (socket >= 0 && 00360 (*it)->state() == MythSocket::Connected && 00361 FD_ISSET(socket, &rfds)) 00362 { 00363 QTime rrtm = QTime::currentTime(); 00364 ReadyToBeRead(*it); 00365 timers[socket] = rrtm.elapsed(); 00366 } 00367 (*it)->Unlock(false); 00368 } 00369 00370 if (VERBOSE_LEVEL_CHECK(VB_SOCKET, LOG_DEBUG)) 00371 { 00372 QString rep = QString("Total read time: %1ms, on sockets") 00373 .arg(tm.elapsed()); 00374 QMap<uint,uint>::const_iterator it = timers.begin(); 00375 for (; it != timers.end(); ++it) 00376 rep += QString(" {%1,%2ms}").arg(it.key()).arg(*it); 00377 if (downref_tm) 00378 rep += QString(" {downref, %1ms}").arg(downref_tm); 00379 00380 LOG(VB_SOCKET, LOG_DEBUG, LOC + rep); 00381 } 00382 00383 m_readyread_lock.lock(); 00384 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Reacquired ready read lock"); 00385 } 00386 00387 LOG(VB_SOCKET, LOG_DEBUG, LOC + "readyread thread exit"); 00388 RunEpilog(); 00389 }
1.7.6.1