MythTV  0.26-pre
bufferedsocketdevice.cpp
Go to the documentation of this file.
00001 
00002 // Program Name: bufferedsocketdevice.cpp
00003 // Created     : Oct. 1, 2005
00004 //
00005 // Purpose     : 
00006 //                                                                            
00007 // Copyright (c) 2005 David Blain <dblain@mythtv.org>
00008 //                                          
00009 // Licensed under the GPL v2 or later, see COPYING for details                    
00010 //
00012 
00013 #include <algorithm>
00014 
00015 #include "mythtimer.h"
00016 #include "bufferedsocketdevice.h"
00017 #include "upnputil.h"
00018 #include "mythlogging.h"
00019 
00021 //
00023 
00024 BufferedSocketDevice::BufferedSocketDevice( int nSocket  )
00025 {
00026     m_pSocket = new MSocketDevice();
00027 
00028     m_pSocket->setSocket         ( nSocket, MSocketDevice::Stream );
00029     m_pSocket->setBlocking       ( false );
00030     m_pSocket->setAddressReusable( true );
00031 
00032     struct linger ling = {1, 1};
00033 
00034     if ( setsockopt(socket(), SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) < 0) 
00035         LOG(VB_GENERAL, LOG_ERR, 
00036             "BufferedSocketDevice: setsockopt - SO_LINGER: " + ENO);
00037 
00038     m_nDestPort          = 0;
00039 
00040     m_nMaxReadBufferSize = 0; 
00041     m_nWriteSize         = 0;
00042     m_nWriteIndex        = 0;
00043     m_bHandleSocketDelete= true;
00044 }
00045 
00047 //
00049 
00050 BufferedSocketDevice::BufferedSocketDevice( MSocketDevice *pSocket /*= NULL*/,
00051                                             bool bTakeOwnership /* = false */ )
00052 {
00053     m_pSocket            = pSocket;
00054 
00055     m_nDestPort          = 0;
00056 
00057     m_nMaxReadBufferSize = 0; 
00058     m_nWriteSize         = 0;
00059     m_nWriteIndex        = 0;
00060     m_bHandleSocketDelete= bTakeOwnership;
00061 
00062 }
00063 
00065 //
00067 
00068 BufferedSocketDevice::~BufferedSocketDevice()
00069 {
00070     Close();
00071 }
00072 
00074 //
00076 
00077 void BufferedSocketDevice::Close()
00078 {
00079     Flush();
00080     ReadBytes();
00081 
00082     m_bufRead.clear();
00083     ClearPendingData();
00084 
00085     if (m_pSocket != NULL)
00086     {
00087         if (m_pSocket->isValid())
00088             m_pSocket->close();
00089 
00090         if (m_bHandleSocketDelete)
00091             delete m_pSocket;
00092 
00093         m_pSocket = NULL;
00094     }
00095 
00096 }
00097 
00099 //
00101 
00102 bool BufferedSocketDevice::Connect( const QHostAddress &addr, quint16 port )
00103 {
00104     if (m_pSocket == NULL)
00105         return false;
00106 
00107     return m_pSocket->connect( addr, port );
00108 }
00109 
00111 //
00113 
00114 MSocketDevice *BufferedSocketDevice::SocketDevice()
00115 {
00116     return( m_pSocket );
00117 }
00118 
00120 //
00122 
00123 void BufferedSocketDevice::SetSocketDevice( MSocketDevice *pSocket )
00124 {
00125     if ((m_bHandleSocketDelete) && (m_pSocket != NULL))
00126         delete m_pSocket;
00127     
00128     m_bHandleSocketDelete = false;
00129 
00130     m_pSocket = pSocket;
00131 }
00132 
00134 //
00136 
00137 void BufferedSocketDevice::SetDestAddress(
00138     QHostAddress hostAddress, quint16 nPort)
00139 {
00140     m_DestHostAddress = hostAddress;
00141     m_nDestPort       = nPort;
00142 }
00143 
00145 //
00147 
00148 void BufferedSocketDevice::SetReadBufferSize( qulonglong bufSize )
00149 {
00150     m_nMaxReadBufferSize = bufSize;
00151 }
00152 
00154 //
00156 
00157 qulonglong BufferedSocketDevice::ReadBufferSize(void) const
00158 {
00159     return m_nMaxReadBufferSize;
00160 }
00161 
00163 //
00165 
00166 int BufferedSocketDevice::ReadBytes()
00167 {
00168     if (m_pSocket == NULL)
00169         return m_bufRead.size();
00170 
00171     qlonglong maxToRead = 0;
00172 
00173     if ( m_nMaxReadBufferSize > 0 ) 
00174     {
00175         maxToRead = m_nMaxReadBufferSize - m_bufRead.size();
00176 
00177         if ( maxToRead <= 0 ) 
00178             return m_bufRead.size();
00179     }
00180 
00181     qlonglong nbytes = m_pSocket->bytesAvailable();
00182     qlonglong nread;
00183 
00184     QByteArray *a = 0;
00185 
00186     if ( nbytes > 0 )
00187     {
00188         a = new QByteArray();
00189         a->resize(nbytes);
00190 
00191         nread = m_pSocket->readBlock(
00192             a->data(), maxToRead ? std::min(nbytes, maxToRead) : nbytes);
00193 
00194         if (( nread > 0 ) && ( nread != (int)a->size() ))
00195         {
00196             // unexpected
00197             a->resize( nread );
00198         }
00199     }
00200 
00201     if (a)
00202     {
00203 #if 0
00204         QString msg;
00205         for( long n = 0; n < a->count(); n++ )
00206             msg += QString("%1").arg(a->at(n));
00207         LOG(VB_GENERAL, LOG_DEBUG, msg);
00208 #endif
00209 
00210         m_bufRead.append( a );
00211     }
00212 
00213     return m_bufRead.size();
00214 }
00215 
00217 //
00219 
00220 bool BufferedSocketDevice::ConsumeWriteBuf( qulonglong nbytes )
00221 {
00222     if ( !nbytes || ((qlonglong)nbytes > m_nWriteSize) )
00223         return false;
00224 
00225     m_nWriteSize -= nbytes;
00226 
00227     for ( ;; ) 
00228     {
00229         QByteArray *a = m_bufWrite.front();
00230 
00231         if ( m_nWriteIndex + nbytes >= (qulonglong)a->size() ) 
00232         {
00233             nbytes -= a->size() - m_nWriteIndex;
00234             m_bufWrite.pop_front();
00235             delete a;
00236 
00237             m_nWriteIndex = 0;
00238 
00239             if ( nbytes == 0 )
00240                 break;
00241         } 
00242         else 
00243         {
00244             m_nWriteIndex += nbytes;
00245             break;
00246         }
00247     }
00248 
00249     return true;
00250 }
00251 
00253 //
00255 
00256 void BufferedSocketDevice::Flush()
00257 {
00258 
00259     if ((m_pSocket == NULL) || !m_pSocket->isValid())
00260         return;
00261 
00262     bool osBufferFull = false;
00263     int  consumed     = 0;
00264 
00265     while ( !osBufferFull && ( m_nWriteSize > 0 ) && m_pSocket->isValid())
00266     {
00267         deque<QByteArray*>::iterator it = m_bufWrite.begin();
00268         QByteArray *a = *it;
00269 
00270         int nwritten = 0;
00271         int i = 0;
00272 
00273         if ( (int)a->size() - m_nWriteIndex < 1460 ) 
00274         {
00275             QByteArray out;
00276             out.resize(65536);
00277 
00278             int j = m_nWriteIndex;
00279             int s = a->size() - j;
00280 
00281             while ( a && i+s < (int)out.size() ) 
00282             {
00283                 memcpy( out.data()+i, a->data()+j, s );
00284                 j = 0;
00285                 i += s;
00286                 ++it;
00287                 a = *it;
00288                 s = a ? a->size() : 0;
00289             }
00290 
00291             if (m_nDestPort != 0)
00292                 nwritten = m_pSocket->writeBlock( out.data(), i, m_DestHostAddress, m_nDestPort );
00293             else
00294                 nwritten = m_pSocket->writeBlock( out.data(), i );
00295         } 
00296         else 
00297         {
00298             // Big block, write it immediately
00299             i = a->size() - m_nWriteIndex;
00300 
00301             if (m_nDestPort != 0)
00302                 nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i, m_DestHostAddress, m_nDestPort );
00303             else
00304                 nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i );
00305         }
00306 
00307         if ( nwritten > 0 ) 
00308         {
00309             if ( ConsumeWriteBuf( nwritten ) )
00310 
00311             consumed += nwritten;
00312         }
00313 
00314         if ( nwritten < i )
00315             osBufferFull = true;
00316     }
00317 }
00318 
00319 
00321 //
00323 
00324 qlonglong BufferedSocketDevice::Size()
00325 {
00326     return (qlonglong)BytesAvailable();
00327 }
00328 
00330 //
00332 
00333 qlonglong BufferedSocketDevice::At() const
00334 {
00335     return( 0 );
00336 }
00337 
00339 //
00341 
00342 bool BufferedSocketDevice::At( qlonglong index )
00343 {
00344     ReadBytes();
00345 
00346     if ( index > m_bufRead.size() )
00347         return false;
00348 
00349     // throw away data 0..index-1
00350     m_bufRead.consumeBytes( (qulonglong)index, 0 );
00351 
00352     return true;
00353 }
00354 
00356 //
00358 
00359 bool BufferedSocketDevice::AtEnd()
00360 {
00361     if ( !m_pSocket->isValid() )
00362         return true;
00363 
00364     ReadBytes();
00365 
00366     return m_bufRead.size() == 0;
00367 
00368 }
00369 
00371 //
00373 
00374 qulonglong BufferedSocketDevice::BytesAvailable(void)
00375 {
00376     if ( !m_pSocket->isValid() )
00377         return 0;
00378 
00379     return ReadBytes();
00380 }
00381 
00383 //
00385 
00386 qulonglong BufferedSocketDevice::WaitForMore(
00387     int msecs, bool *pTimeout /* = NULL*/ ) 
00388 {
00389     bool bTimeout = false;
00390 
00391     if ( !m_pSocket->isValid() )
00392         return 0;
00393 
00394     qlonglong nBytes = BytesAvailable();
00395 
00396     if (nBytes == 0)
00397     {
00398 /*
00399         The following code is a possible workaround to the lost request problem
00400         I just hate looping too much to put it in.  I believe there is something
00401         I'm missing that is causing the lost packets... Just need to find it.
00402 
00403         bTimeout      = true;
00404         int    nCount = 0;
00405         int    msWait = msecs / 100;
00406         
00407         while (((nBytes = ReadBytes()) == 0 ) && 
00408                (nCount++              < 100 ) &&
00409                 bTimeout                      && 
00410                 m_pSocket->isValid()         )
00411         {
00412             // give up control
00413 
00414             usleep( 1000 );  // should be some multiple of msWait.
00415 
00416         }
00417     }
00418 */
00419         // -=>TODO: Override the timeout to 1 second... Closes connection sooner
00420         //          to help recover from lost requests.  (hack until better fix found)
00421 
00422         msecs  = 1000;
00423 
00424         nBytes = m_pSocket->waitForMore( msecs, &bTimeout );
00425 
00426         if (pTimeout != NULL)
00427             *pTimeout = bTimeout;
00428     }
00429             
00430     return nBytes; // nBytes  //m_bufRead.size();
00431 }
00432 
00434 //
00436 
00437 qulonglong BufferedSocketDevice::BytesToWrite(void) const
00438 {
00439     return m_nWriteSize;
00440 
00441 }
00442 
00444 //
00446 
00447 void BufferedSocketDevice::ClearPendingData()
00448 {
00449     while (!m_bufWrite.empty())
00450     {
00451         delete m_bufWrite.back();
00452         m_bufWrite.pop_back();
00453     }
00454     m_nWriteIndex = m_nWriteSize = 0;
00455 }
00456 
00458 //
00460 
00461 void BufferedSocketDevice::ClearReadBuffer()
00462 {
00463     m_bufRead.clear();
00464 }
00465 
00467 //
00469 
00470 qlonglong BufferedSocketDevice::ReadBlock( char *data, qulonglong maxlen )
00471 {
00472     if ( data == 0 && maxlen != 0 ) 
00473         return -1;
00474 
00475     if ( !m_pSocket->isOpen() ) 
00476         return -1;
00477 
00478     ReadBytes();
00479 
00480     if ( maxlen >= (qulonglong)m_bufRead.size() )
00481         maxlen = m_bufRead.size();
00482 
00483     m_bufRead.consumeBytes( maxlen, data );
00484 
00485     return maxlen;
00486 
00487 }
00488 
00490 //
00492 
00493 qlonglong BufferedSocketDevice::WriteBlock( const char *data, qulonglong len )
00494 {
00495     if ( len == 0 )
00496         return 0;
00497 
00498     QByteArray *a = m_bufWrite.back();
00499 
00500     bool writeNow = ( (m_nWriteSize + len >= 1400) || (len > 512) );
00501 
00502     if ( a && (a->size() + len < 128) ) 
00503     {
00504         // small buffer, resize
00505         int i = a->size();
00506 
00507         a->resize( i+len );
00508         memcpy( a->data()+i, data, len );
00509     } 
00510     else 
00511     {
00512         // append new buffer
00513         m_bufWrite.push_back(new QByteArray(data, len));
00514     }
00515 
00516     m_nWriteSize += len;
00517 
00518     if ( writeNow )
00519         Flush();
00520 
00521     return len;
00522 }
00523 
00525 //
00527 
00528 qlonglong BufferedSocketDevice::WriteBlockDirect(
00529     const char *data, qulonglong len)
00530 {
00531     qlonglong nWritten = 0;
00532         
00533     // must Flush data just in case caller is mixing buffered & un-buffered calls
00534 
00535     Flush();
00536 
00537     if (m_nDestPort != 0)
00538         nWritten = m_pSocket->writeBlock( data, len, m_DestHostAddress, m_nDestPort );
00539     else
00540         nWritten = m_pSocket->writeBlock( data, len );
00541 
00542     return nWritten;
00543 }
00544 
00546 //
00548 
00549 int BufferedSocketDevice::Getch()
00550 {
00551     if ( m_pSocket->isOpen() )
00552     {
00553         ReadBytes();
00554 
00555         if (m_bufRead.size() > 0 ) 
00556         {
00557             uchar c;
00558 
00559             m_bufRead.consumeBytes( 1, (char*)&c );
00560         
00561             return c;
00562         }
00563     }
00564 
00565     return -1;
00566 }
00567 
00569 //
00571 
00572 int BufferedSocketDevice::Putch( int ch )
00573 {
00574     char buf[2];
00575 
00576     buf[0] = ch;
00577 
00578     return WriteBlock(buf, 1) == 1 ? ch : -1;
00579 }
00580 
00582 //
00584 
00585 int BufferedSocketDevice::Ungetch(int ch )
00586 {
00587     return m_bufRead.ungetch( ch );
00588 }
00589 
00591 //
00593 
00594 bool BufferedSocketDevice::CanReadLine()
00595 {
00596     ReadBytes();
00597 
00598     if (( BytesAvailable() > 0 ) && m_bufRead.scanNewline( 0 ) )
00599         return true;
00600 
00601     return false;
00602 }
00603                                
00605 //
00607 
00608 QString BufferedSocketDevice::ReadLine()
00609 {
00610     QByteArray a;
00611     a.resize(256);
00612 
00613     ReadBytes();
00614 
00615     bool nl = m_bufRead.scanNewline( &a );
00616 
00617     QString s;
00618 
00619     if ( nl ) 
00620     {
00621         At( a.size() );     // skips the data read
00622 
00623         s = QString( a );
00624     }
00625 
00626     return s;
00627 }
00628 
00630 //
00632 
00633 QString BufferedSocketDevice::ReadLine( int msecs )
00634 {
00635     MythTimer timer;
00636     QString   sLine;
00637 
00638     if ( CanReadLine() )
00639         return( ReadLine() );
00640         
00641     // ----------------------------------------------------------------------
00642     // If the user supplied a timeout, lets loop until we can read a line 
00643     // or timeout.
00644     // ----------------------------------------------------------------------
00645 
00646     if ( msecs > 0)
00647     {
00648         bool bTimeout = false;
00649 
00650         timer.start();
00651 
00652         while ( !CanReadLine() && !bTimeout )
00653         {
00654 #if 0
00655             LOG(VB_UPNP, LOG_DEBUG, "Can't Read Line... Waiting for more." );
00656 #endif
00657 
00658             WaitForMore( msecs, &bTimeout );
00659 
00660             if ( timer.elapsed() >= msecs ) 
00661             {
00662                 bTimeout = true;
00663                 LOG(VB_UPNP, LOG_INFO, "Exceeded Total Elapsed Wait Time." );
00664             }
00665         }
00666             
00667         if (CanReadLine())
00668             sLine = ReadLine();
00669     }
00670 
00671     return( sLine );
00672 }
00673 
00675 //
00677 
00678 quint16 BufferedSocketDevice::Port(void) const
00679 {
00680     if (m_pSocket)
00681         return( m_pSocket->port() );
00682 
00683     return 0;
00684 }
00685 
00687 //
00689 
00690 quint16 BufferedSocketDevice::PeerPort(void) const
00691 {
00692     if (m_pSocket)
00693         return( m_pSocket->peerPort() );
00694 
00695     return 0;
00696 }
00697 
00699 //
00701 
00702 QHostAddress BufferedSocketDevice::Address() const
00703 {
00704     if (m_pSocket)
00705         return( m_pSocket->address() );
00706 
00707     QHostAddress tmp;
00708 
00709     return tmp;
00710 }
00711 
00713 //
00715 
00716 QHostAddress BufferedSocketDevice::PeerAddress() const
00717 {
00718     if (m_pSocket)
00719         return( m_pSocket->peerAddress() );
00720 
00721     QHostAddress tmp;
00722 
00723     return tmp;
00724 }
00725 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends