|
MythTV
0.26-pre
|
00001 00002 // Program Name: bufferedsocketdevice.cpp 00003 // Created : Oct. 1, 2005 00004 // 00005 // Purpose : 00006 // 00007 // Copyright (c) 2005 David Blain <dblain@mythtv.org> 00008 // 00009 // Licensed under the GPL v2 or later, see COPYING for details 00010 // 00012 00013 #include <algorithm> 00014 00015 #include "mythtimer.h" 00016 #include "bufferedsocketdevice.h" 00017 #include "upnputil.h" 00018 #include "mythlogging.h" 00019 00021 // 00023 00024 BufferedSocketDevice::BufferedSocketDevice( int nSocket ) 00025 { 00026 m_pSocket = new MSocketDevice(); 00027 00028 m_pSocket->setSocket ( nSocket, MSocketDevice::Stream ); 00029 m_pSocket->setBlocking ( false ); 00030 m_pSocket->setAddressReusable( true ); 00031 00032 struct linger ling = {1, 1}; 00033 00034 if ( setsockopt(socket(), SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) < 0) 00035 LOG(VB_GENERAL, LOG_ERR, 00036 "BufferedSocketDevice: setsockopt - SO_LINGER: " + ENO); 00037 00038 m_nDestPort = 0; 00039 00040 m_nMaxReadBufferSize = 0; 00041 m_nWriteSize = 0; 00042 m_nWriteIndex = 0; 00043 m_bHandleSocketDelete= true; 00044 } 00045 00047 // 00049 00050 BufferedSocketDevice::BufferedSocketDevice( MSocketDevice *pSocket /*= NULL*/, 00051 bool bTakeOwnership /* = false */ ) 00052 { 00053 m_pSocket = pSocket; 00054 00055 m_nDestPort = 0; 00056 00057 m_nMaxReadBufferSize = 0; 00058 m_nWriteSize = 0; 00059 m_nWriteIndex = 0; 00060 m_bHandleSocketDelete= bTakeOwnership; 00061 00062 } 00063 00065 // 00067 00068 BufferedSocketDevice::~BufferedSocketDevice() 00069 { 00070 Close(); 00071 } 00072 00074 // 00076 00077 void BufferedSocketDevice::Close() 00078 { 00079 Flush(); 00080 ReadBytes(); 00081 00082 m_bufRead.clear(); 00083 ClearPendingData(); 00084 00085 if (m_pSocket != NULL) 00086 { 00087 if (m_pSocket->isValid()) 00088 m_pSocket->close(); 00089 00090 if (m_bHandleSocketDelete) 00091 delete m_pSocket; 00092 00093 m_pSocket = NULL; 00094 } 00095 00096 } 00097 00099 // 00101 00102 bool BufferedSocketDevice::Connect( const QHostAddress &addr, quint16 port ) 00103 { 00104 if (m_pSocket == NULL) 00105 return false; 00106 00107 return m_pSocket->connect( addr, port ); 00108 } 00109 00111 // 00113 00114 MSocketDevice *BufferedSocketDevice::SocketDevice() 00115 { 00116 return( m_pSocket ); 00117 } 00118 00120 // 00122 00123 void BufferedSocketDevice::SetSocketDevice( MSocketDevice *pSocket ) 00124 { 00125 if ((m_bHandleSocketDelete) && (m_pSocket != NULL)) 00126 delete m_pSocket; 00127 00128 m_bHandleSocketDelete = false; 00129 00130 m_pSocket = pSocket; 00131 } 00132 00134 // 00136 00137 void BufferedSocketDevice::SetDestAddress( 00138 QHostAddress hostAddress, quint16 nPort) 00139 { 00140 m_DestHostAddress = hostAddress; 00141 m_nDestPort = nPort; 00142 } 00143 00145 // 00147 00148 void BufferedSocketDevice::SetReadBufferSize( qulonglong bufSize ) 00149 { 00150 m_nMaxReadBufferSize = bufSize; 00151 } 00152 00154 // 00156 00157 qulonglong BufferedSocketDevice::ReadBufferSize(void) const 00158 { 00159 return m_nMaxReadBufferSize; 00160 } 00161 00163 // 00165 00166 int BufferedSocketDevice::ReadBytes() 00167 { 00168 if (m_pSocket == NULL) 00169 return m_bufRead.size(); 00170 00171 qlonglong maxToRead = 0; 00172 00173 if ( m_nMaxReadBufferSize > 0 ) 00174 { 00175 maxToRead = m_nMaxReadBufferSize - m_bufRead.size(); 00176 00177 if ( maxToRead <= 0 ) 00178 return m_bufRead.size(); 00179 } 00180 00181 qlonglong nbytes = m_pSocket->bytesAvailable(); 00182 qlonglong nread; 00183 00184 QByteArray *a = 0; 00185 00186 if ( nbytes > 0 ) 00187 { 00188 a = new QByteArray(); 00189 a->resize(nbytes); 00190 00191 nread = m_pSocket->readBlock( 00192 a->data(), maxToRead ? std::min(nbytes, maxToRead) : nbytes); 00193 00194 if (( nread > 0 ) && ( nread != (int)a->size() )) 00195 { 00196 // unexpected 00197 a->resize( nread ); 00198 } 00199 } 00200 00201 if (a) 00202 { 00203 #if 0 00204 QString msg; 00205 for( long n = 0; n < a->count(); n++ ) 00206 msg += QString("%1").arg(a->at(n)); 00207 LOG(VB_GENERAL, LOG_DEBUG, msg); 00208 #endif 00209 00210 m_bufRead.append( a ); 00211 } 00212 00213 return m_bufRead.size(); 00214 } 00215 00217 // 00219 00220 bool BufferedSocketDevice::ConsumeWriteBuf( qulonglong nbytes ) 00221 { 00222 if ( !nbytes || ((qlonglong)nbytes > m_nWriteSize) ) 00223 return false; 00224 00225 m_nWriteSize -= nbytes; 00226 00227 for ( ;; ) 00228 { 00229 QByteArray *a = m_bufWrite.front(); 00230 00231 if ( m_nWriteIndex + nbytes >= (qulonglong)a->size() ) 00232 { 00233 nbytes -= a->size() - m_nWriteIndex; 00234 m_bufWrite.pop_front(); 00235 delete a; 00236 00237 m_nWriteIndex = 0; 00238 00239 if ( nbytes == 0 ) 00240 break; 00241 } 00242 else 00243 { 00244 m_nWriteIndex += nbytes; 00245 break; 00246 } 00247 } 00248 00249 return true; 00250 } 00251 00253 // 00255 00256 void BufferedSocketDevice::Flush() 00257 { 00258 00259 if ((m_pSocket == NULL) || !m_pSocket->isValid()) 00260 return; 00261 00262 bool osBufferFull = false; 00263 int consumed = 0; 00264 00265 while ( !osBufferFull && ( m_nWriteSize > 0 ) && m_pSocket->isValid()) 00266 { 00267 deque<QByteArray*>::iterator it = m_bufWrite.begin(); 00268 QByteArray *a = *it; 00269 00270 int nwritten = 0; 00271 int i = 0; 00272 00273 if ( (int)a->size() - m_nWriteIndex < 1460 ) 00274 { 00275 QByteArray out; 00276 out.resize(65536); 00277 00278 int j = m_nWriteIndex; 00279 int s = a->size() - j; 00280 00281 while ( a && i+s < (int)out.size() ) 00282 { 00283 memcpy( out.data()+i, a->data()+j, s ); 00284 j = 0; 00285 i += s; 00286 ++it; 00287 a = *it; 00288 s = a ? a->size() : 0; 00289 } 00290 00291 if (m_nDestPort != 0) 00292 nwritten = m_pSocket->writeBlock( out.data(), i, m_DestHostAddress, m_nDestPort ); 00293 else 00294 nwritten = m_pSocket->writeBlock( out.data(), i ); 00295 } 00296 else 00297 { 00298 // Big block, write it immediately 00299 i = a->size() - m_nWriteIndex; 00300 00301 if (m_nDestPort != 0) 00302 nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i, m_DestHostAddress, m_nDestPort ); 00303 else 00304 nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i ); 00305 } 00306 00307 if ( nwritten > 0 ) 00308 { 00309 if ( ConsumeWriteBuf( nwritten ) ) 00310 00311 consumed += nwritten; 00312 } 00313 00314 if ( nwritten < i ) 00315 osBufferFull = true; 00316 } 00317 } 00318 00319 00321 // 00323 00324 qlonglong BufferedSocketDevice::Size() 00325 { 00326 return (qlonglong)BytesAvailable(); 00327 } 00328 00330 // 00332 00333 qlonglong BufferedSocketDevice::At() const 00334 { 00335 return( 0 ); 00336 } 00337 00339 // 00341 00342 bool BufferedSocketDevice::At( qlonglong index ) 00343 { 00344 ReadBytes(); 00345 00346 if ( index > m_bufRead.size() ) 00347 return false; 00348 00349 // throw away data 0..index-1 00350 m_bufRead.consumeBytes( (qulonglong)index, 0 ); 00351 00352 return true; 00353 } 00354 00356 // 00358 00359 bool BufferedSocketDevice::AtEnd() 00360 { 00361 if ( !m_pSocket->isValid() ) 00362 return true; 00363 00364 ReadBytes(); 00365 00366 return m_bufRead.size() == 0; 00367 00368 } 00369 00371 // 00373 00374 qulonglong BufferedSocketDevice::BytesAvailable(void) 00375 { 00376 if ( !m_pSocket->isValid() ) 00377 return 0; 00378 00379 return ReadBytes(); 00380 } 00381 00383 // 00385 00386 qulonglong BufferedSocketDevice::WaitForMore( 00387 int msecs, bool *pTimeout /* = NULL*/ ) 00388 { 00389 bool bTimeout = false; 00390 00391 if ( !m_pSocket->isValid() ) 00392 return 0; 00393 00394 qlonglong nBytes = BytesAvailable(); 00395 00396 if (nBytes == 0) 00397 { 00398 /* 00399 The following code is a possible workaround to the lost request problem 00400 I just hate looping too much to put it in. I believe there is something 00401 I'm missing that is causing the lost packets... Just need to find it. 00402 00403 bTimeout = true; 00404 int nCount = 0; 00405 int msWait = msecs / 100; 00406 00407 while (((nBytes = ReadBytes()) == 0 ) && 00408 (nCount++ < 100 ) && 00409 bTimeout && 00410 m_pSocket->isValid() ) 00411 { 00412 // give up control 00413 00414 usleep( 1000 ); // should be some multiple of msWait. 00415 00416 } 00417 } 00418 */ 00419 // -=>TODO: Override the timeout to 1 second... Closes connection sooner 00420 // to help recover from lost requests. (hack until better fix found) 00421 00422 msecs = 1000; 00423 00424 nBytes = m_pSocket->waitForMore( msecs, &bTimeout ); 00425 00426 if (pTimeout != NULL) 00427 *pTimeout = bTimeout; 00428 } 00429 00430 return nBytes; // nBytes //m_bufRead.size(); 00431 } 00432 00434 // 00436 00437 qulonglong BufferedSocketDevice::BytesToWrite(void) const 00438 { 00439 return m_nWriteSize; 00440 00441 } 00442 00444 // 00446 00447 void BufferedSocketDevice::ClearPendingData() 00448 { 00449 while (!m_bufWrite.empty()) 00450 { 00451 delete m_bufWrite.back(); 00452 m_bufWrite.pop_back(); 00453 } 00454 m_nWriteIndex = m_nWriteSize = 0; 00455 } 00456 00458 // 00460 00461 void BufferedSocketDevice::ClearReadBuffer() 00462 { 00463 m_bufRead.clear(); 00464 } 00465 00467 // 00469 00470 qlonglong BufferedSocketDevice::ReadBlock( char *data, qulonglong maxlen ) 00471 { 00472 if ( data == 0 && maxlen != 0 ) 00473 return -1; 00474 00475 if ( !m_pSocket->isOpen() ) 00476 return -1; 00477 00478 ReadBytes(); 00479 00480 if ( maxlen >= (qulonglong)m_bufRead.size() ) 00481 maxlen = m_bufRead.size(); 00482 00483 m_bufRead.consumeBytes( maxlen, data ); 00484 00485 return maxlen; 00486 00487 } 00488 00490 // 00492 00493 qlonglong BufferedSocketDevice::WriteBlock( const char *data, qulonglong len ) 00494 { 00495 if ( len == 0 ) 00496 return 0; 00497 00498 QByteArray *a = m_bufWrite.back(); 00499 00500 bool writeNow = ( (m_nWriteSize + len >= 1400) || (len > 512) ); 00501 00502 if ( a && (a->size() + len < 128) ) 00503 { 00504 // small buffer, resize 00505 int i = a->size(); 00506 00507 a->resize( i+len ); 00508 memcpy( a->data()+i, data, len ); 00509 } 00510 else 00511 { 00512 // append new buffer 00513 m_bufWrite.push_back(new QByteArray(data, len)); 00514 } 00515 00516 m_nWriteSize += len; 00517 00518 if ( writeNow ) 00519 Flush(); 00520 00521 return len; 00522 } 00523 00525 // 00527 00528 qlonglong BufferedSocketDevice::WriteBlockDirect( 00529 const char *data, qulonglong len) 00530 { 00531 qlonglong nWritten = 0; 00532 00533 // must Flush data just in case caller is mixing buffered & un-buffered calls 00534 00535 Flush(); 00536 00537 if (m_nDestPort != 0) 00538 nWritten = m_pSocket->writeBlock( data, len, m_DestHostAddress, m_nDestPort ); 00539 else 00540 nWritten = m_pSocket->writeBlock( data, len ); 00541 00542 return nWritten; 00543 } 00544 00546 // 00548 00549 int BufferedSocketDevice::Getch() 00550 { 00551 if ( m_pSocket->isOpen() ) 00552 { 00553 ReadBytes(); 00554 00555 if (m_bufRead.size() > 0 ) 00556 { 00557 uchar c; 00558 00559 m_bufRead.consumeBytes( 1, (char*)&c ); 00560 00561 return c; 00562 } 00563 } 00564 00565 return -1; 00566 } 00567 00569 // 00571 00572 int BufferedSocketDevice::Putch( int ch ) 00573 { 00574 char buf[2]; 00575 00576 buf[0] = ch; 00577 00578 return WriteBlock(buf, 1) == 1 ? ch : -1; 00579 } 00580 00582 // 00584 00585 int BufferedSocketDevice::Ungetch(int ch ) 00586 { 00587 return m_bufRead.ungetch( ch ); 00588 } 00589 00591 // 00593 00594 bool BufferedSocketDevice::CanReadLine() 00595 { 00596 ReadBytes(); 00597 00598 if (( BytesAvailable() > 0 ) && m_bufRead.scanNewline( 0 ) ) 00599 return true; 00600 00601 return false; 00602 } 00603 00605 // 00607 00608 QString BufferedSocketDevice::ReadLine() 00609 { 00610 QByteArray a; 00611 a.resize(256); 00612 00613 ReadBytes(); 00614 00615 bool nl = m_bufRead.scanNewline( &a ); 00616 00617 QString s; 00618 00619 if ( nl ) 00620 { 00621 At( a.size() ); // skips the data read 00622 00623 s = QString( a ); 00624 } 00625 00626 return s; 00627 } 00628 00630 // 00632 00633 QString BufferedSocketDevice::ReadLine( int msecs ) 00634 { 00635 MythTimer timer; 00636 QString sLine; 00637 00638 if ( CanReadLine() ) 00639 return( ReadLine() ); 00640 00641 // ---------------------------------------------------------------------- 00642 // If the user supplied a timeout, lets loop until we can read a line 00643 // or timeout. 00644 // ---------------------------------------------------------------------- 00645 00646 if ( msecs > 0) 00647 { 00648 bool bTimeout = false; 00649 00650 timer.start(); 00651 00652 while ( !CanReadLine() && !bTimeout ) 00653 { 00654 #if 0 00655 LOG(VB_UPNP, LOG_DEBUG, "Can't Read Line... Waiting for more." ); 00656 #endif 00657 00658 WaitForMore( msecs, &bTimeout ); 00659 00660 if ( timer.elapsed() >= msecs ) 00661 { 00662 bTimeout = true; 00663 LOG(VB_UPNP, LOG_INFO, "Exceeded Total Elapsed Wait Time." ); 00664 } 00665 } 00666 00667 if (CanReadLine()) 00668 sLine = ReadLine(); 00669 } 00670 00671 return( sLine ); 00672 } 00673 00675 // 00677 00678 quint16 BufferedSocketDevice::Port(void) const 00679 { 00680 if (m_pSocket) 00681 return( m_pSocket->port() ); 00682 00683 return 0; 00684 } 00685 00687 // 00689 00690 quint16 BufferedSocketDevice::PeerPort(void) const 00691 { 00692 if (m_pSocket) 00693 return( m_pSocket->peerPort() ); 00694 00695 return 0; 00696 } 00697 00699 // 00701 00702 QHostAddress BufferedSocketDevice::Address() const 00703 { 00704 if (m_pSocket) 00705 return( m_pSocket->address() ); 00706 00707 QHostAddress tmp; 00708 00709 return tmp; 00710 } 00711 00713 // 00715 00716 QHostAddress BufferedSocketDevice::PeerAddress() const 00717 { 00718 if (m_pSocket) 00719 return( m_pSocket->peerAddress() ); 00720 00721 QHostAddress tmp; 00722 00723 return tmp; 00724 } 00725
1.7.6.1