MythTV  0.26-pre
RTCP.cpp
Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2005 Live Networks, Inc.  All rights reserved.
00018 // RTCP
00019 // Implementation
00020 
00021 #include "RTCP.hh"
00022 #include "GroupsockHelper.hh"
00023 #include "rtcp_from_spec.h"
00024 
00026 
00027 class RTCPMemberDatabase {
00028 public:
00029   RTCPMemberDatabase(RTCPInstance& ourRTCPInstance)
00030     : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 /*ourself*/),
00031       fTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00032   }
00033 
00034   virtual ~RTCPMemberDatabase() {
00035         delete fTable;
00036   }
00037 
00038   Boolean isMember(unsigned ssrc) const {
00039     return fTable->Lookup((char*)(long)ssrc) != NULL;
00040   }
00041 
00042   Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) {
00043     Boolean isNew = !isMember(ssrc);
00044 
00045     if (isNew) {
00046       ++fNumMembers;
00047     }
00048 
00049     // Record the current time, so we can age stale members
00050     fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount);
00051 
00052     return isNew;
00053   }
00054 
00055   Boolean remove(unsigned ssrc) {
00056     Boolean wasPresent = fTable->Remove((char*)(long)ssrc);
00057     if (wasPresent) {
00058       --fNumMembers;
00059     }
00060     return wasPresent;
00061   }
00062 
00063   unsigned numMembers() const {
00064     return fNumMembers;
00065   }
00066 
00067   void reapOldMembers(unsigned threshold);
00068 
00069 private:
00070   RTCPInstance& fOurRTCPInstance;
00071   unsigned fNumMembers;
00072   HashTable* fTable;
00073 };
00074 
00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) {
00076   Boolean foundOldMember;
00077   unsigned oldSSRC = 0;
00078 
00079   do {
00080     foundOldMember = False;
00081 
00082     HashTable::Iterator* iter
00083       = HashTable::Iterator::create(*fTable);
00084     unsigned long timeCount;
00085     char const* key;
00086     while ((timeCount = (unsigned long)(iter->next(key))) != 0) {
00087 #ifdef DEBUG
00088       fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold);
00089 #endif
00090       if (timeCount < (unsigned long)threshold) { // this SSRC is old
00091         unsigned long ssrc = (unsigned long)key;
00092         oldSSRC = (unsigned)ssrc;
00093         foundOldMember = True;
00094       }
00095     }
00096     delete iter;
00097 
00098     if (foundOldMember) {
00099 #ifdef DEBUG
00100         fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC);
00101 #endif
00102       fOurRTCPInstance.removeSSRC(oldSSRC, True);
00103     }
00104   } while (foundOldMember);
00105 }
00106 
00107 
00109 
00110 static double dTimeNow() {
00111     struct timeval timeNow;
00112     gettimeofday(&timeNow, NULL);
00113     return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0);
00114 }
00115 
00116 static unsigned const maxPacketSize = 1450;
00117         // bytes (1500, minus some allowance for IP, UDP, UMTP headers)
00118 static unsigned const preferredPacketSize = 1000; // bytes
00119 
00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
00121                            unsigned totSessionBW,
00122                            unsigned char const* cname,
00123                            RTPSink* sink, RTPSource const* source,
00124                            Boolean isSSMSource)
00125   : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
00126     fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
00127     fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
00128     fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
00129     fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
00130     fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
00131     fHaveJustSentPacket(False), fLastPacketSentSize(0),
00132     fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
00133     fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
00134     fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
00135     fSpecificRRHandlerTable(NULL) {
00136 #ifdef DEBUG
00137   fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
00138 #endif
00139   if (isSSMSource) RTCPgs->multicastSendOnly(); // don't receive multicast
00140     
00141   double timeNow = dTimeNow();
00142   fPrevReportTime = fNextReportTime = timeNow;
00143 
00144   fKnownMembers = new RTCPMemberDatabase(*this);
00145   fInBuf = new unsigned char[maxPacketSize];
00146   if (fKnownMembers == NULL || fInBuf == NULL) return;
00147 
00148   // A hack to save buffer space, because RTCP packets are always small:
00149   unsigned savedMaxSize = OutPacketBuffer::maxSize;
00150   OutPacketBuffer::maxSize = maxPacketSize;
00151   fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);
00152   OutPacketBuffer::maxSize = savedMaxSize;
00153   if (fOutBuf == NULL) return;
00154 
00155   // Arrange to handle incoming reports from others:
00156   TaskScheduler::BackgroundHandlerProc* handler
00157     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00158   fRTCPInterface.startNetworkReading(handler);
00159   
00160   // Send our first report.
00161   fTypeOfEvent = EVENT_REPORT;
00162   onExpire(this);
00163 }
00164 
00165 struct RRHandlerRecord {
00166   TaskFunc* rrHandlerTask;
00167   void* rrHandlerClientData;
00168 };
00169 
00170 RTCPInstance::~RTCPInstance() {
00171 #ifdef DEBUG
00172   fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this);
00173 #endif
00174   // Turn off background read handling:
00175   fRTCPInterface.stopNetworkReading();
00176 
00177   // Begin by sending a BYE.  We have to do this immediately, without
00178   // 'reconsideration', because "this" is going away.
00179   fTypeOfEvent = EVENT_BYE; // not used, but...
00180   sendBYE();
00181 
00182   if (fSpecificRRHandlerTable != NULL) {
00183     AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable);
00184     RRHandlerRecord* rrHandler;
00185     while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) {
00186       delete rrHandler;
00187     }
00188     delete fSpecificRRHandlerTable;
00189   }
00190 
00191   delete fKnownMembers;
00192   delete fOutBuf;
00193   delete[] fInBuf;
00194 }
00195 
00196 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
00197                                       unsigned totSessionBW,
00198                                       unsigned char const* cname,
00199                                       RTPSink* sink, RTPSource const* source,
00200                                       Boolean isSSMSource) {
00201   return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
00202                           isSSMSource);
00203 }
00204 
00205 Boolean RTCPInstance::lookupByName(UsageEnvironment& env,
00206                                    char const* instanceName,
00207                                    RTCPInstance*& resultInstance) {
00208   resultInstance = NULL; // unless we succeed
00209 
00210   Medium* medium;
00211   if (!Medium::lookupByName(env, instanceName, medium)) return False;
00212 
00213   if (!medium->isRTCPInstance()) {
00214     env.setResultMsg(instanceName, " is not a RTCP instance");
00215     return False;
00216   }
00217 
00218   resultInstance = (RTCPInstance*)medium;
00219   return True;
00220 }
00221 
00222 Boolean RTCPInstance::isRTCPInstance() const {
00223   return True;
00224 }
00225 
00226 unsigned RTCPInstance::numMembers() const {
00227   if (fKnownMembers == NULL) return 0;
00228 
00229   return fKnownMembers->numMembers();
00230 }
00231 
00232 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData,
00233                                  Boolean handleActiveParticipantsOnly) {
00234   fByeHandlerTask = handlerTask;
00235   fByeHandlerClientData = clientData;
00236   fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly;
00237 }
00238 
00239 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) {
00240   fSRHandlerTask = handlerTask;
00241   fSRHandlerClientData = clientData;
00242 }
00243 
00244 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) {
00245   fRRHandlerTask = handlerTask;
00246   fRRHandlerClientData = clientData;
00247 }
00248 
00249 void RTCPInstance
00250 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
00251                        TaskFunc* handlerTask, void* clientData) {
00252   if (handlerTask == NULL && clientData == NULL) {
00253     unsetSpecificRRHandler(fromAddress, fromPort);
00254     return;
00255   }
00256 
00257   RRHandlerRecord* rrHandler = new RRHandlerRecord;
00258   rrHandler->rrHandlerTask = handlerTask;
00259   rrHandler->rrHandlerClientData = clientData;
00260   if (fSpecificRRHandlerTable == NULL) {
00261     fSpecificRRHandlerTable = new AddressPortLookupTable;
00262   }
00263   fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler);
00264 }
00265 
00266 void RTCPInstance
00267 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) {
00268   if (fSpecificRRHandlerTable == NULL) return;
00269 
00270   RRHandlerRecord* rrHandler
00271     = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort));
00272   if (rrHandler != NULL) {
00273     fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort);
00274     delete rrHandler;
00275   }
00276 }
00277 
00278 void RTCPInstance::setStreamSocket(int sockNum,
00279                                    unsigned char streamChannelId) {
00280   // Turn off background read handling:
00281   fRTCPInterface.stopNetworkReading();
00282 
00283   // Switch to RTCP-over-TCP:
00284   fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00285 
00286   // Turn background reading back on:
00287   TaskScheduler::BackgroundHandlerProc* handler
00288     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00289   fRTCPInterface.startNetworkReading(handler);
00290 }
00291 
00292 void RTCPInstance::addStreamSocket(int sockNum,
00293                                    unsigned char streamChannelId) {
00294   // Add the RTCP-over-TCP interface:
00295   fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00296 
00297   // Turn on background reading for this socket (in case it's not on already):
00298   TaskScheduler::BackgroundHandlerProc* handler
00299     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00300   fRTCPInterface.startNetworkReading(handler);
00301 }
00302 
00303 static unsigned const IP_UDP_HDR_SIZE = 28;
00304     // overhead (bytes) of IP and UDP hdrs
00305 
00306 #define ADVANCE(n) pkt += (n); packetSize -= (n)
00307 
00308 void RTCPInstance::incomingReportHandler(RTCPInstance* instance,
00309                                          int /*mask*/) {
00310   instance->incomingReportHandler1();
00311 }
00312 
00313 void RTCPInstance::incomingReportHandler1() {
00314   unsigned char* pkt = fInBuf;
00315   unsigned packetSize;
00316   struct sockaddr_in fromAddress;
00317   int typeOfPacket = PACKET_UNKNOWN_TYPE;
00318 
00319   do {
00320     if (!fRTCPInterface.handleRead(pkt, maxPacketSize,
00321                                    packetSize, fromAddress)) {
00322       break;
00323     }
00324 
00325     // Ignore the packet if it was looped-back from ourself:
00326     if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) {
00327       // However, we still want to handle incoming RTCP packets from
00328       // *other processes* on the same machine.  To distinguish this
00329       // case from a true loop-back, check whether we've just sent a
00330       // packet of the same size.  (This check isn't perfect, but it seems
00331       // to be the best we can do.)
00332       if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) {
00333         // This is a true loop-back:
00334         fHaveJustSentPacket = False;
00335         break; // ignore this packet
00336       }
00337     }
00338 
00339     if (fIsSSMSource) {
00340       // This packet was received via unicast.  'Reflect' it by resending
00341       // it to the multicast group.
00342       // NOTE: Denial-of-service attacks are possible here.
00343       // Users of this software may wish to add their own,
00344       // application-specific mechanism for 'authenticating' the
00345       // validity of this packet before relecting it.
00346       fRTCPInterface.sendPacket(pkt, packetSize);
00347       fHaveJustSentPacket = True;
00348       fLastPacketSentSize = packetSize;
00349     }
00350 
00351 #ifdef DEBUG
00352     fprintf(stderr, "[%p]saw incoming RTCP packet (from address %s, port %d)\n", this, our_inet_ntoa(fromAddress.sin_addr), ntohs(fromAddress.sin_port));
00353     unsigned char* p = pkt;
00354     for (unsigned i = 0; i < packetSize; ++i) {
00355       if (i%4 == 0) fprintf(stderr, " ");
00356       fprintf(stderr, "%02x", p[i]);
00357     }
00358     fprintf(stderr, "\n");
00359 #endif
00360     int totPacketSize = IP_UDP_HDR_SIZE + packetSize;
00361 
00362     // Check the RTCP packet for validity:
00363     // It must at least contain a header (4 bytes), and this header
00364     // must be version=2, with no padding bit, and a payload type of
00365     // SR (200) or RR (201):
00366     if (packetSize < 4) break;
00367     unsigned rtcpHdr = ntohl(*(unsigned*)pkt);
00368     if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) {
00369 #ifdef DEBUG
00370       fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr);
00371 #endif
00372       break;
00373     }
00374 
00375     // Process each of the individual RTCP 'subpackets' in (what may be)
00376     // a compound RTCP packet.
00377     unsigned reportSenderSSRC = 0;
00378     Boolean packetOK = False;
00379     while (1) {
00380       unsigned rc = (rtcpHdr>>24)&0x1F;
00381       unsigned pt = (rtcpHdr>>16)&0xFF;
00382       unsigned length = 4*(rtcpHdr&0xFFFF); // doesn't count hdr
00383       ADVANCE(4); // skip over the header
00384       if (length > packetSize) break;
00385 
00386       // Assume that each RTCP subpacket begins with a 4-byte SSRC:
00387       if (length < 4) break; length -= 4;
00388       reportSenderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4);
00389 
00390       Boolean subPacketOK = False;
00391       switch (pt) {
00392         case RTCP_PT_SR: {
00393 #ifdef DEBUG
00394           fprintf(stderr, "SR\n");
00395 #endif
00396           if (length < 20) break; length -= 20;
00397 
00398           // Extract the NTP timestamp, and note this:
00399           unsigned NTPmsw = ntohl(*(unsigned*)pkt); ADVANCE(4);
00400           unsigned NTPlsw = ntohl(*(unsigned*)pkt); ADVANCE(4);
00401           unsigned rtpTimestamp = ntohl(*(unsigned*)pkt); ADVANCE(4);
00402           if (fSource != NULL) {
00403             RTPReceptionStatsDB& receptionStats
00404               = fSource->receptionStatsDB();
00405             receptionStats.noteIncomingSR(reportSenderSSRC,
00406                                           NTPmsw, NTPlsw, rtpTimestamp);
00407           }
00408           ADVANCE(8); // skip over packet count, octet count
00409 
00410           // If a 'SR handler' was set, call it now:
00411           if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData);
00412 
00413           // The rest of the SR is handled like a RR (so, no "break;" here)
00414         }
00415         case RTCP_PT_RR: {
00416 #ifdef DEBUG
00417           fprintf(stderr, "RR\n");
00418 #endif
00419           unsigned reportBlocksSize = rc*(6*4);
00420           if (length < reportBlocksSize) break;
00421           length -= reportBlocksSize;
00422 
00423           if (fSink != NULL) {
00424             // Use this information to update stats about our transmissions:
00425             RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB();
00426             for (unsigned i = 0; i < rc; ++i) {
00427               unsigned senderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4);
00428               // We care only about reports about our own transmission, not others'
00429               if (senderSSRC == fSink->SSRC()) {
00430                 unsigned lossStats = ntohl(*(unsigned*)pkt); ADVANCE(4);
00431                 unsigned highestReceived = ntohl(*(unsigned*)pkt); ADVANCE(4);
00432                 unsigned jitter = ntohl(*(unsigned*)pkt); ADVANCE(4);
00433                 unsigned timeLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4);
00434                 unsigned timeSinceLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4);
00435                 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress,
00436                                                  lossStats,
00437                                                  highestReceived, jitter,
00438                                                  timeLastSR, timeSinceLastSR);
00439               } else {
00440                 ADVANCE(4*5);
00441               }
00442             }
00443           } else {
00444             ADVANCE(reportBlocksSize);
00445           }
00446 
00447           if (pt == RTCP_PT_RR) { // i.e., we didn't fall through from 'SR'
00448             // If a 'RR handler' was set, call it now:
00449 
00450             // Specific RR handler:
00451             if (fSpecificRRHandlerTable != NULL) {
00452               netAddressBits fromAddr = fromAddress.sin_addr.s_addr;
00453               Port fromPort(ntohs(fromAddress.sin_port)); 
00454               RRHandlerRecord* rrHandler
00455                 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
00456               if (rrHandler != NULL) {
00457                 if (rrHandler->rrHandlerTask != NULL) {
00458                   (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
00459                 }
00460               }
00461             }
00462 
00463             // General RR handler:
00464             if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData);
00465           }
00466 
00467           subPacketOK = True;
00468           typeOfPacket = PACKET_RTCP_REPORT;
00469           break;
00470         }
00471         case RTCP_PT_BYE: {
00472 #ifdef DEBUG
00473           fprintf(stderr, "BYE\n");
00474 #endif
00475           // If a 'BYE handler' was set, call it now:
00476           TaskFunc* byeHandler = fByeHandlerTask;
00477           if (byeHandler != NULL
00478               && (!fByeHandleActiveParticipantsOnly
00479                   || (fSource != NULL
00480                       && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL)
00481                   || (fSink != NULL
00482                       && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) {
00483             fByeHandlerTask = NULL;
00484                 // we call this only once by default 
00485             (*byeHandler)(fByeHandlerClientData);
00486           }
00487 
00488           // We should really check for & handle >1 SSRCs being present #####
00489 
00490           subPacketOK = True;
00491           typeOfPacket = PACKET_BYE;
00492           break;
00493         }
00494         // Later handle SDES, APP, and compound RTCP packets #####
00495         default:
00496 #ifdef DEBUG
00497           fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt);
00498 #endif
00499           subPacketOK = True;
00500           break;
00501       }  
00502       if (!subPacketOK) break;
00503 
00504       // need to check for (& handle) SSRC collision! #####
00505 
00506 #ifdef DEBUG
00507       fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC);
00508 #endif
00509       
00510       // Skip over any remaining bytes in this subpacket:
00511       ADVANCE(length);
00512 
00513       // Check whether another RTCP 'subpacket' follows:
00514       if (packetSize == 0) {
00515         packetOK = True;
00516         break;
00517       } else if (packetSize < 4) {
00518 #ifdef DEBUG
00519         fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize);
00520 #endif
00521         break;
00522       }
00523       rtcpHdr = ntohl(*(unsigned*)pkt);
00524       if ((rtcpHdr & 0xC0000000) != 0x80000000) {
00525 #ifdef DEBUG
00526         fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00527 #endif
00528         break;
00529       }
00530     }
00531       
00532     if (!packetOK) {
00533 #ifdef DEBUG
00534       fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00535 #endif
00536       break;
00537     } else {
00538 #ifdef DEBUG
00539       fprintf(stderr, "validated entire RTCP packet\n");
00540 #endif
00541     }
00542       
00543     onReceive(typeOfPacket, totPacketSize, reportSenderSSRC);
00544   } while (0);
00545 }
00546 
00547 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize,
00548                              unsigned ssrc) {
00549   fTypeOfPacket = typeOfPacket;
00550   fLastReceivedSize = totPacketSize;
00551   fLastReceivedSSRC = ssrc;
00552 
00553   int members = (int)numMembers();
00554   int senders = (fSink != NULL) ? 1 : 0;
00555 
00556   OnReceive(this, // p
00557             this, // e
00558             &members, // members
00559             &fPrevNumMembers, // pmembers
00560             &senders, // senders
00561             &fAveRTCPSize, // avg_rtcp_size
00562             &fPrevReportTime, // tp
00563             dTimeNow(), // tc
00564             fNextReportTime);
00565 }
00566 
00567 void RTCPInstance::sendReport() {
00568 #ifdef DEBUG
00569   fprintf(stderr, "sending REPORT\n");
00570 #endif
00571   // Begin by including a SR and/or RR report:
00572   addReport();
00573 
00574   // Then, include a SDES:
00575   addSDES();
00576 
00577   // Send the report:
00578   sendBuiltPacket();
00579 
00580   // Periodically clean out old members from our SSRC membership database:
00581   const unsigned membershipReapPeriod = 5;
00582   if ((++fOutgoingReportCount) % membershipReapPeriod == 0) {
00583     unsigned threshold = fOutgoingReportCount - membershipReapPeriod;
00584     fKnownMembers->reapOldMembers(threshold);
00585   }
00586 }
00587 
00588 void RTCPInstance::sendBYE() {
00589 #ifdef DEBUG
00590   fprintf(stderr, "sending BYE\n");
00591 #endif
00592   // The packet must begin with a SR and/or RR report:
00593   addReport();
00594 
00595   addBYE();
00596   sendBuiltPacket();
00597 }
00598 
00599 void RTCPInstance::sendBuiltPacket() {
00600 #ifdef DEBUG
00601   fprintf(stderr, "sending RTCP packet\n");
00602   unsigned char* p = fOutBuf->packet();
00603   for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) {
00604     if (i%4 == 0) fprintf(stderr," ");
00605     fprintf(stderr, "%02x", p[i]);
00606   }
00607   fprintf(stderr, "\n");
00608 #endif
00609   unsigned reportSize = fOutBuf->curPacketSize();
00610   fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize);
00611   fOutBuf->resetOffset();
00612 
00613   fLastSentSize = IP_UDP_HDR_SIZE + reportSize;
00614   fHaveJustSentPacket = True;
00615   fLastPacketSentSize = reportSize;
00616 }
00617 
00618 int RTCPInstance::checkNewSSRC() {
00619   return fKnownMembers->noteMembership(fLastReceivedSSRC,
00620                                        fOutgoingReportCount);
00621 }
00622 
00623 void RTCPInstance::removeLastReceivedSSRC() {
00624   removeSSRC(fLastReceivedSSRC, False/*keep stats around*/);
00625 }
00626 
00627 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) {
00628   fKnownMembers->remove(ssrc);
00629 
00630   if (alsoRemoveStats) {
00631     // Also, remove records of this SSRC from any reception or transmission stats
00632     if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc);
00633     if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc);
00634   }
00635 }
00636 
00637 void RTCPInstance::onExpire(RTCPInstance* instance) {
00638   instance->onExpire1();
00639 }
00640 
00641 // Member functions to build specific kinds of report:
00642 
00643 void RTCPInstance::addReport() {
00644   // Include a SR or a RR, depending on whether we
00645   // have an associated sink or source:
00646   if (fSink != NULL) {
00647     addSR();
00648   } else if (fSource != NULL) {
00649     addRR();
00650   }
00651 }
00652 
00653 void RTCPInstance::addSR() {
00654   // ASSERT: fSink != NULL
00655 
00656   enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(),
00657                             5 /* extra words in a SR */);
00658 
00659   // Now, add the 'sender info' for our sink
00660 
00661   // Insert the NTP and RTP timestamps for the 'wallclock time':
00662   struct timeval timeNow;
00663   gettimeofday(&timeNow, NULL);
00664   fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80);
00665       // NTP timestamp most-significant word (1970 epoch -> 1900 epoch)
00666   double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000; // 2^32/10^6
00667   fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5));
00668       // NTP timestamp least-significant word
00669   unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow);
00670   fOutBuf->enqueueWord(rtpTimestamp); // RTP ts
00671 
00672   // Insert the packet and byte counts:
00673   fOutBuf->enqueueWord(fSink->packetCount());
00674   fOutBuf->enqueueWord(fSink->octetCount());
00675 
00676   enqueueCommonReportSuffix();
00677 }
00678 
00679 void RTCPInstance::addRR() {
00680   // ASSERT: fSource != NULL
00681 
00682   enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC());
00683   enqueueCommonReportSuffix();
00684 }
00685 
00686 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType,
00687                                              unsigned SSRC,
00688                                              unsigned numExtraWords) {
00689   unsigned numReportingSources;
00690   if (fSource == NULL) {
00691     numReportingSources = 0; // we don't receive anything
00692   } else {
00693     RTPReceptionStatsDB& allReceptionStats
00694       = fSource->receptionStatsDB();
00695     numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset();
00696     // This must be <32, to fit in 5 bits:
00697     if (numReportingSources >= 32) { numReportingSources = 32; }
00698     // Later: support adding more reports to handle >32 sources (unlikely)#####
00699   }
00700 
00701   unsigned rtcpHdr = 0x80000000; // version 2, no padding
00702   rtcpHdr |= (numReportingSources<<24);
00703   rtcpHdr |= (packetType<<16);
00704   rtcpHdr |= (1 + numExtraWords + 6*numReportingSources);
00705       // each report block is 6 32-bit words long
00706   fOutBuf->enqueueWord(rtcpHdr);
00707 
00708   fOutBuf->enqueueWord(SSRC);
00709 }
00710 
00711 void RTCPInstance::enqueueCommonReportSuffix() {
00712   // Output the report blocks for each source:
00713   if (fSource != NULL) { 
00714     RTPReceptionStatsDB& allReceptionStats
00715       = fSource->receptionStatsDB();
00716 
00717     RTPReceptionStatsDB::Iterator iterator(allReceptionStats);
00718     while (1) {
00719       RTPReceptionStats* receptionStats = iterator.next();
00720       if (receptionStats == NULL) break;
00721       enqueueReportBlock(receptionStats);
00722     }
00723 
00724     allReceptionStats.reset(); // because we have just generated a report
00725   }
00726 }
00727 
00728 void
00729 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) {
00730   fOutBuf->enqueueWord(stats->SSRC());
00731 
00732   unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived();
00733 
00734   unsigned totNumExpected
00735     = highestExtSeqNumReceived - stats->baseExtSeqNumReceived();
00736   int totNumLost = totNumExpected - stats->totNumPacketsReceived();
00737   // 'Clamp' this loss number to a 24-bit signed value:
00738   if (totNumLost > 0x007FFFFF) {
00739     totNumLost = 0x007FFFFF;
00740   } else if (totNumLost < 0) {
00741     if (totNumLost < -0x00800000) totNumLost = 0x00800000; // unlikely, but...
00742     totNumLost &= 0x00FFFFFF;
00743   }
00744 
00745   unsigned numExpectedSinceLastReset
00746     = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived();
00747   int numLostSinceLastReset
00748     = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset(); 
00749   unsigned char lossFraction;
00750   if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) {
00751     lossFraction = 0;
00752   } else {
00753     lossFraction = (unsigned char)
00754       ((numLostSinceLastReset << 8) / numExpectedSinceLastReset);
00755   }
00756   
00757   fOutBuf->enqueueWord((lossFraction<<24) | totNumLost);
00758   fOutBuf->enqueueWord(highestExtSeqNumReceived);
00759 
00760   fOutBuf->enqueueWord(stats->jitter());
00761 
00762   unsigned NTPmsw = stats->lastReceivedSR_NTPmsw();
00763   unsigned NTPlsw = stats->lastReceivedSR_NTPlsw();
00764   unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16); // middle 32 bits
00765   fOutBuf->enqueueWord(LSR);
00766 
00767   // Figure out how long has elapsed since the last SR rcvd from this src:
00768   struct timeval const& LSRtime = stats->lastReceivedSR_time(); // "last SR"
00769   struct timeval timeNow, timeSinceLSR;
00770   gettimeofday(&timeNow, NULL);
00771   if (timeNow.tv_usec < LSRtime.tv_usec) {
00772     timeNow.tv_usec += 1000000;
00773     timeNow.tv_sec -= 1;
00774   }
00775   timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec;
00776   timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec;
00777   // The enqueued time is in units of 1/65536 seconds.
00778   // (Note that 65536/1000000 == 1024/15625) 
00779   unsigned DLSR;
00780   if (LSR == 0) {
00781     DLSR = 0;
00782   } else {
00783     DLSR = (timeSinceLSR.tv_sec<<16)
00784          | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF);
00785   }
00786   fOutBuf->enqueueWord(DLSR);
00787 }
00788 
00789 void RTCPInstance::addSDES() {
00790   // For now we support only the CNAME item; later support more #####
00791 
00792   // Begin by figuring out the size of the entire SDES report:
00793   unsigned numBytes = 4;
00794       // counts the SSRC, but not the header; it'll get subtracted out
00795   numBytes += fCNAME.totalSize(); // includes id and length
00796   numBytes += 1; // the special END item
00797 
00798   unsigned num4ByteWords = (numBytes + 3)/4;
00799 
00800   unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC chunk
00801   rtcpHdr |= (RTCP_PT_SDES<<16);
00802   rtcpHdr |= num4ByteWords;
00803   fOutBuf->enqueueWord(rtcpHdr);
00804 
00805   if (fSource != NULL) {
00806     fOutBuf->enqueueWord(fSource->SSRC());
00807   } else if (fSink != NULL) {
00808     fOutBuf->enqueueWord(fSink->SSRC());
00809   }
00810 
00811   // Add the CNAME:
00812   fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize());
00813 
00814   // Add the 'END' item (i.e., a zero byte), plus any more needed to pad:
00815   unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4);
00816   unsigned char const zero = '\0';
00817   while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1);
00818 }
00819 
00820 void RTCPInstance::addBYE() {
00821   unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC
00822   rtcpHdr |= (RTCP_PT_BYE<<16);
00823   rtcpHdr |= 1; // 2 32-bit words total (i.e., with 1 SSRC)
00824   fOutBuf->enqueueWord(rtcpHdr);
00825 
00826   if (fSource != NULL) {
00827     fOutBuf->enqueueWord(fSource->SSRC());
00828   } else if (fSink != NULL) {
00829     fOutBuf->enqueueWord(fSink->SSRC());
00830   }
00831 }
00832 
00833 void RTCPInstance::schedule(double nextTime) {
00834   fNextReportTime = nextTime;
00835 
00836   double secondsToDelay = nextTime - dTimeNow();
00837 #ifdef DEBUG
00838   fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime);
00839 #endif
00840   int usToGo = (int)(secondsToDelay * 1000000);
00841   nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo,
00842                                 (TaskFunc*)RTCPInstance::onExpire, this);
00843 }
00844 
00845 void RTCPInstance::reschedule(double nextTime) {
00846   envir().taskScheduler().unscheduleDelayedTask(nextTask());
00847   schedule(nextTime);
00848 }
00849 
00850 void RTCPInstance::onExpire1() {
00851   // Note: fTotSessionBW is kbits per second
00852   double rtcpBW = 0.05*fTotSessionBW*1024/8; // -> bytes per second
00853 
00854   OnExpire(this, // event
00855            numMembers(), // members
00856            (fSink != NULL) ? 1 : 0, // senders
00857            rtcpBW, // rtcp_bw
00858            (fSink != NULL) ? 1 : 0, // we_sent
00859            &fAveRTCPSize, // ave_rtcp_size
00860            &fIsInitial, // initial
00861            dTimeNow(), // tc
00862            &fPrevReportTime, // tp
00863            &fPrevNumMembers // pmembers
00864            );
00865 }
00866 
00868 
00869 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) {
00870   unsigned length = strlen((char const*)value);
00871   if (length > 511) length = 511;
00872 
00873   fData[0] = tag;
00874   fData[1] = (unsigned char)length;
00875   memmove(&fData[2], value, length);
00876 
00877   // Pad the trailing bytes to a 4-byte boundary:
00878   while ((length)%4 > 0) fData[2 + length++] = '\0';
00879 }
00880 
00881 unsigned SDESItem::totalSize() const {
00882   return 2 + (unsigned)fData[1];
00883 }
00884 
00885 
00887 
00888 extern "C" void Schedule(double nextTime, event e) {
00889   RTCPInstance* instance = (RTCPInstance*)e;
00890   if (instance == NULL) return;
00891 
00892   instance->schedule(nextTime);
00893 }
00894 
00895 extern "C" void Reschedule(double nextTime, event e) {
00896   RTCPInstance* instance = (RTCPInstance*)e;
00897   if (instance == NULL) return;
00898 
00899   instance->reschedule(nextTime);
00900 }
00901 
00902 extern "C" void SendRTCPReport(event e) {
00903   RTCPInstance* instance = (RTCPInstance*)e;
00904   if (instance == NULL) return;
00905 
00906   instance->sendReport();
00907 }
00908 
00909 extern "C" void SendBYEPacket(event e) {
00910   RTCPInstance* instance = (RTCPInstance*)e;
00911   if (instance == NULL) return;
00912 
00913   instance->sendBYE();
00914 }
00915 
00916 extern "C" int TypeOfEvent(event e) {
00917   RTCPInstance* instance = (RTCPInstance*)e;
00918   if (instance == NULL) return EVENT_UNKNOWN;
00919 
00920   return instance->typeOfEvent();
00921 }
00922 
00923 extern "C" int SentPacketSize(event e) {
00924   RTCPInstance* instance = (RTCPInstance*)e;
00925   if (instance == NULL) return 0;
00926 
00927   return instance->sentPacketSize();
00928 }
00929 
00930 extern "C" int PacketType(packet p) {
00931   RTCPInstance* instance = (RTCPInstance*)p;
00932   if (instance == NULL) return PACKET_UNKNOWN_TYPE;
00933 
00934   return instance->packetType();
00935 }
00936 
00937 extern "C" int ReceivedPacketSize(packet p) {
00938   RTCPInstance* instance = (RTCPInstance*)p;
00939   if (instance == NULL) return 0;
00940 
00941   return instance->receivedPacketSize();
00942 }
00943 
00944 extern "C" int NewMember(packet p) {
00945   RTCPInstance* instance = (RTCPInstance*)p;
00946   if (instance == NULL) return 0;
00947 
00948   return instance->checkNewSSRC();
00949 }
00950 
00951 extern "C" int NewSender(packet /*p*/) {
00952   return 0; // we don't yet recognize senders other than ourselves #####
00953 }
00954 
00955 extern "C" void AddMember(packet /*p*/) {
00956   // Do nothing; all of the real work was done when NewMember() was called
00957 }
00958 
00959 extern "C" void AddSender(packet /*p*/) {
00960   // we don't yet recognize senders other than ourselves #####
00961 }
00962 
00963 extern "C" void RemoveMember(packet p) {
00964   RTCPInstance* instance = (RTCPInstance*)p;
00965   if (instance == NULL) return;
00966 
00967   instance->removeLastReceivedSSRC();
00968 }
00969 
00970 extern "C" void RemoveSender(packet /*p*/) {
00971   // we don't yet recognize senders other than ourselves #####
00972 }
00973 
00974 extern "C" double drand30() {
00975   unsigned tmp = our_random()&0x3FFFFFFF; // a random 30-bit integer
00976   return tmp/(double)(1024*1024*1024);
00977 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends