|
MythTV
0.26-pre
|
00001 /********** 00002 This library is free software; you can redistribute it and/or modify it under 00003 the terms of the GNU Lesser General Public License as published by the 00004 Free Software Foundation; either version 2.1 of the License, or (at your 00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) 00006 00007 This library is distributed in the hope that it will be useful, but WITHOUT 00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 00009 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for 00010 more details. 00011 00012 You should have received a copy of the GNU Lesser General Public License 00013 along with this library; if not, write to the Free Software Foundation, Inc., 00014 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00015 **********/ 00016 // "liveMedia" 00017 // Copyright (c) 1996-2005 Live Networks, Inc. All rights reserved. 00018 // RTCP 00019 // Implementation 00020 00021 #include "RTCP.hh" 00022 #include "GroupsockHelper.hh" 00023 #include "rtcp_from_spec.h" 00024 00026 00027 class RTCPMemberDatabase { 00028 public: 00029 RTCPMemberDatabase(RTCPInstance& ourRTCPInstance) 00030 : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 /*ourself*/), 00031 fTable(HashTable::create(ONE_WORD_HASH_KEYS)) { 00032 } 00033 00034 virtual ~RTCPMemberDatabase() { 00035 delete fTable; 00036 } 00037 00038 Boolean isMember(unsigned ssrc) const { 00039 return fTable->Lookup((char*)(long)ssrc) != NULL; 00040 } 00041 00042 Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) { 00043 Boolean isNew = !isMember(ssrc); 00044 00045 if (isNew) { 00046 ++fNumMembers; 00047 } 00048 00049 // Record the current time, so we can age stale members 00050 fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount); 00051 00052 return isNew; 00053 } 00054 00055 Boolean remove(unsigned ssrc) { 00056 Boolean wasPresent = fTable->Remove((char*)(long)ssrc); 00057 if (wasPresent) { 00058 --fNumMembers; 00059 } 00060 return wasPresent; 00061 } 00062 00063 unsigned numMembers() const { 00064 return fNumMembers; 00065 } 00066 00067 void reapOldMembers(unsigned threshold); 00068 00069 private: 00070 RTCPInstance& fOurRTCPInstance; 00071 unsigned fNumMembers; 00072 HashTable* fTable; 00073 }; 00074 00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) { 00076 Boolean foundOldMember; 00077 unsigned oldSSRC = 0; 00078 00079 do { 00080 foundOldMember = False; 00081 00082 HashTable::Iterator* iter 00083 = HashTable::Iterator::create(*fTable); 00084 unsigned long timeCount; 00085 char const* key; 00086 while ((timeCount = (unsigned long)(iter->next(key))) != 0) { 00087 #ifdef DEBUG 00088 fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold); 00089 #endif 00090 if (timeCount < (unsigned long)threshold) { // this SSRC is old 00091 unsigned long ssrc = (unsigned long)key; 00092 oldSSRC = (unsigned)ssrc; 00093 foundOldMember = True; 00094 } 00095 } 00096 delete iter; 00097 00098 if (foundOldMember) { 00099 #ifdef DEBUG 00100 fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC); 00101 #endif 00102 fOurRTCPInstance.removeSSRC(oldSSRC, True); 00103 } 00104 } while (foundOldMember); 00105 } 00106 00107 00109 00110 static double dTimeNow() { 00111 struct timeval timeNow; 00112 gettimeofday(&timeNow, NULL); 00113 return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0); 00114 } 00115 00116 static unsigned const maxPacketSize = 1450; 00117 // bytes (1500, minus some allowance for IP, UDP, UMTP headers) 00118 static unsigned const preferredPacketSize = 1000; // bytes 00119 00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs, 00121 unsigned totSessionBW, 00122 unsigned char const* cname, 00123 RTPSink* sink, RTPSource const* source, 00124 Boolean isSSMSource) 00125 : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW), 00126 fSink(sink), fSource(source), fIsSSMSource(isSSMSource), 00127 fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1), 00128 fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0), 00129 fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0), 00130 fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE), 00131 fHaveJustSentPacket(False), fLastPacketSentSize(0), 00132 fByeHandlerTask(NULL), fByeHandlerClientData(NULL), 00133 fSRHandlerTask(NULL), fSRHandlerClientData(NULL), 00134 fRRHandlerTask(NULL), fRRHandlerClientData(NULL), 00135 fSpecificRRHandlerTable(NULL) { 00136 #ifdef DEBUG 00137 fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this); 00138 #endif 00139 if (isSSMSource) RTCPgs->multicastSendOnly(); // don't receive multicast 00140 00141 double timeNow = dTimeNow(); 00142 fPrevReportTime = fNextReportTime = timeNow; 00143 00144 fKnownMembers = new RTCPMemberDatabase(*this); 00145 fInBuf = new unsigned char[maxPacketSize]; 00146 if (fKnownMembers == NULL || fInBuf == NULL) return; 00147 00148 // A hack to save buffer space, because RTCP packets are always small: 00149 unsigned savedMaxSize = OutPacketBuffer::maxSize; 00150 OutPacketBuffer::maxSize = maxPacketSize; 00151 fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize); 00152 OutPacketBuffer::maxSize = savedMaxSize; 00153 if (fOutBuf == NULL) return; 00154 00155 // Arrange to handle incoming reports from others: 00156 TaskScheduler::BackgroundHandlerProc* handler 00157 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler; 00158 fRTCPInterface.startNetworkReading(handler); 00159 00160 // Send our first report. 00161 fTypeOfEvent = EVENT_REPORT; 00162 onExpire(this); 00163 } 00164 00165 struct RRHandlerRecord { 00166 TaskFunc* rrHandlerTask; 00167 void* rrHandlerClientData; 00168 }; 00169 00170 RTCPInstance::~RTCPInstance() { 00171 #ifdef DEBUG 00172 fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this); 00173 #endif 00174 // Turn off background read handling: 00175 fRTCPInterface.stopNetworkReading(); 00176 00177 // Begin by sending a BYE. We have to do this immediately, without 00178 // 'reconsideration', because "this" is going away. 00179 fTypeOfEvent = EVENT_BYE; // not used, but... 00180 sendBYE(); 00181 00182 if (fSpecificRRHandlerTable != NULL) { 00183 AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable); 00184 RRHandlerRecord* rrHandler; 00185 while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) { 00186 delete rrHandler; 00187 } 00188 delete fSpecificRRHandlerTable; 00189 } 00190 00191 delete fKnownMembers; 00192 delete fOutBuf; 00193 delete[] fInBuf; 00194 } 00195 00196 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs, 00197 unsigned totSessionBW, 00198 unsigned char const* cname, 00199 RTPSink* sink, RTPSource const* source, 00200 Boolean isSSMSource) { 00201 return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source, 00202 isSSMSource); 00203 } 00204 00205 Boolean RTCPInstance::lookupByName(UsageEnvironment& env, 00206 char const* instanceName, 00207 RTCPInstance*& resultInstance) { 00208 resultInstance = NULL; // unless we succeed 00209 00210 Medium* medium; 00211 if (!Medium::lookupByName(env, instanceName, medium)) return False; 00212 00213 if (!medium->isRTCPInstance()) { 00214 env.setResultMsg(instanceName, " is not a RTCP instance"); 00215 return False; 00216 } 00217 00218 resultInstance = (RTCPInstance*)medium; 00219 return True; 00220 } 00221 00222 Boolean RTCPInstance::isRTCPInstance() const { 00223 return True; 00224 } 00225 00226 unsigned RTCPInstance::numMembers() const { 00227 if (fKnownMembers == NULL) return 0; 00228 00229 return fKnownMembers->numMembers(); 00230 } 00231 00232 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData, 00233 Boolean handleActiveParticipantsOnly) { 00234 fByeHandlerTask = handlerTask; 00235 fByeHandlerClientData = clientData; 00236 fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly; 00237 } 00238 00239 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) { 00240 fSRHandlerTask = handlerTask; 00241 fSRHandlerClientData = clientData; 00242 } 00243 00244 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) { 00245 fRRHandlerTask = handlerTask; 00246 fRRHandlerClientData = clientData; 00247 } 00248 00249 void RTCPInstance 00250 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort, 00251 TaskFunc* handlerTask, void* clientData) { 00252 if (handlerTask == NULL && clientData == NULL) { 00253 unsetSpecificRRHandler(fromAddress, fromPort); 00254 return; 00255 } 00256 00257 RRHandlerRecord* rrHandler = new RRHandlerRecord; 00258 rrHandler->rrHandlerTask = handlerTask; 00259 rrHandler->rrHandlerClientData = clientData; 00260 if (fSpecificRRHandlerTable == NULL) { 00261 fSpecificRRHandlerTable = new AddressPortLookupTable; 00262 } 00263 fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler); 00264 } 00265 00266 void RTCPInstance 00267 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) { 00268 if (fSpecificRRHandlerTable == NULL) return; 00269 00270 RRHandlerRecord* rrHandler 00271 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort)); 00272 if (rrHandler != NULL) { 00273 fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort); 00274 delete rrHandler; 00275 } 00276 } 00277 00278 void RTCPInstance::setStreamSocket(int sockNum, 00279 unsigned char streamChannelId) { 00280 // Turn off background read handling: 00281 fRTCPInterface.stopNetworkReading(); 00282 00283 // Switch to RTCP-over-TCP: 00284 fRTCPInterface.setStreamSocket(sockNum, streamChannelId); 00285 00286 // Turn background reading back on: 00287 TaskScheduler::BackgroundHandlerProc* handler 00288 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler; 00289 fRTCPInterface.startNetworkReading(handler); 00290 } 00291 00292 void RTCPInstance::addStreamSocket(int sockNum, 00293 unsigned char streamChannelId) { 00294 // Add the RTCP-over-TCP interface: 00295 fRTCPInterface.setStreamSocket(sockNum, streamChannelId); 00296 00297 // Turn on background reading for this socket (in case it's not on already): 00298 TaskScheduler::BackgroundHandlerProc* handler 00299 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler; 00300 fRTCPInterface.startNetworkReading(handler); 00301 } 00302 00303 static unsigned const IP_UDP_HDR_SIZE = 28; 00304 // overhead (bytes) of IP and UDP hdrs 00305 00306 #define ADVANCE(n) pkt += (n); packetSize -= (n) 00307 00308 void RTCPInstance::incomingReportHandler(RTCPInstance* instance, 00309 int /*mask*/) { 00310 instance->incomingReportHandler1(); 00311 } 00312 00313 void RTCPInstance::incomingReportHandler1() { 00314 unsigned char* pkt = fInBuf; 00315 unsigned packetSize; 00316 struct sockaddr_in fromAddress; 00317 int typeOfPacket = PACKET_UNKNOWN_TYPE; 00318 00319 do { 00320 if (!fRTCPInterface.handleRead(pkt, maxPacketSize, 00321 packetSize, fromAddress)) { 00322 break; 00323 } 00324 00325 // Ignore the packet if it was looped-back from ourself: 00326 if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) { 00327 // However, we still want to handle incoming RTCP packets from 00328 // *other processes* on the same machine. To distinguish this 00329 // case from a true loop-back, check whether we've just sent a 00330 // packet of the same size. (This check isn't perfect, but it seems 00331 // to be the best we can do.) 00332 if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) { 00333 // This is a true loop-back: 00334 fHaveJustSentPacket = False; 00335 break; // ignore this packet 00336 } 00337 } 00338 00339 if (fIsSSMSource) { 00340 // This packet was received via unicast. 'Reflect' it by resending 00341 // it to the multicast group. 00342 // NOTE: Denial-of-service attacks are possible here. 00343 // Users of this software may wish to add their own, 00344 // application-specific mechanism for 'authenticating' the 00345 // validity of this packet before relecting it. 00346 fRTCPInterface.sendPacket(pkt, packetSize); 00347 fHaveJustSentPacket = True; 00348 fLastPacketSentSize = packetSize; 00349 } 00350 00351 #ifdef DEBUG 00352 fprintf(stderr, "[%p]saw incoming RTCP packet (from address %s, port %d)\n", this, our_inet_ntoa(fromAddress.sin_addr), ntohs(fromAddress.sin_port)); 00353 unsigned char* p = pkt; 00354 for (unsigned i = 0; i < packetSize; ++i) { 00355 if (i%4 == 0) fprintf(stderr, " "); 00356 fprintf(stderr, "%02x", p[i]); 00357 } 00358 fprintf(stderr, "\n"); 00359 #endif 00360 int totPacketSize = IP_UDP_HDR_SIZE + packetSize; 00361 00362 // Check the RTCP packet for validity: 00363 // It must at least contain a header (4 bytes), and this header 00364 // must be version=2, with no padding bit, and a payload type of 00365 // SR (200) or RR (201): 00366 if (packetSize < 4) break; 00367 unsigned rtcpHdr = ntohl(*(unsigned*)pkt); 00368 if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) { 00369 #ifdef DEBUG 00370 fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr); 00371 #endif 00372 break; 00373 } 00374 00375 // Process each of the individual RTCP 'subpackets' in (what may be) 00376 // a compound RTCP packet. 00377 unsigned reportSenderSSRC = 0; 00378 Boolean packetOK = False; 00379 while (1) { 00380 unsigned rc = (rtcpHdr>>24)&0x1F; 00381 unsigned pt = (rtcpHdr>>16)&0xFF; 00382 unsigned length = 4*(rtcpHdr&0xFFFF); // doesn't count hdr 00383 ADVANCE(4); // skip over the header 00384 if (length > packetSize) break; 00385 00386 // Assume that each RTCP subpacket begins with a 4-byte SSRC: 00387 if (length < 4) break; length -= 4; 00388 reportSenderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4); 00389 00390 Boolean subPacketOK = False; 00391 switch (pt) { 00392 case RTCP_PT_SR: { 00393 #ifdef DEBUG 00394 fprintf(stderr, "SR\n"); 00395 #endif 00396 if (length < 20) break; length -= 20; 00397 00398 // Extract the NTP timestamp, and note this: 00399 unsigned NTPmsw = ntohl(*(unsigned*)pkt); ADVANCE(4); 00400 unsigned NTPlsw = ntohl(*(unsigned*)pkt); ADVANCE(4); 00401 unsigned rtpTimestamp = ntohl(*(unsigned*)pkt); ADVANCE(4); 00402 if (fSource != NULL) { 00403 RTPReceptionStatsDB& receptionStats 00404 = fSource->receptionStatsDB(); 00405 receptionStats.noteIncomingSR(reportSenderSSRC, 00406 NTPmsw, NTPlsw, rtpTimestamp); 00407 } 00408 ADVANCE(8); // skip over packet count, octet count 00409 00410 // If a 'SR handler' was set, call it now: 00411 if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData); 00412 00413 // The rest of the SR is handled like a RR (so, no "break;" here) 00414 } 00415 case RTCP_PT_RR: { 00416 #ifdef DEBUG 00417 fprintf(stderr, "RR\n"); 00418 #endif 00419 unsigned reportBlocksSize = rc*(6*4); 00420 if (length < reportBlocksSize) break; 00421 length -= reportBlocksSize; 00422 00423 if (fSink != NULL) { 00424 // Use this information to update stats about our transmissions: 00425 RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB(); 00426 for (unsigned i = 0; i < rc; ++i) { 00427 unsigned senderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4); 00428 // We care only about reports about our own transmission, not others' 00429 if (senderSSRC == fSink->SSRC()) { 00430 unsigned lossStats = ntohl(*(unsigned*)pkt); ADVANCE(4); 00431 unsigned highestReceived = ntohl(*(unsigned*)pkt); ADVANCE(4); 00432 unsigned jitter = ntohl(*(unsigned*)pkt); ADVANCE(4); 00433 unsigned timeLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4); 00434 unsigned timeSinceLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4); 00435 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress, 00436 lossStats, 00437 highestReceived, jitter, 00438 timeLastSR, timeSinceLastSR); 00439 } else { 00440 ADVANCE(4*5); 00441 } 00442 } 00443 } else { 00444 ADVANCE(reportBlocksSize); 00445 } 00446 00447 if (pt == RTCP_PT_RR) { // i.e., we didn't fall through from 'SR' 00448 // If a 'RR handler' was set, call it now: 00449 00450 // Specific RR handler: 00451 if (fSpecificRRHandlerTable != NULL) { 00452 netAddressBits fromAddr = fromAddress.sin_addr.s_addr; 00453 Port fromPort(ntohs(fromAddress.sin_port)); 00454 RRHandlerRecord* rrHandler 00455 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort)); 00456 if (rrHandler != NULL) { 00457 if (rrHandler->rrHandlerTask != NULL) { 00458 (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData); 00459 } 00460 } 00461 } 00462 00463 // General RR handler: 00464 if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData); 00465 } 00466 00467 subPacketOK = True; 00468 typeOfPacket = PACKET_RTCP_REPORT; 00469 break; 00470 } 00471 case RTCP_PT_BYE: { 00472 #ifdef DEBUG 00473 fprintf(stderr, "BYE\n"); 00474 #endif 00475 // If a 'BYE handler' was set, call it now: 00476 TaskFunc* byeHandler = fByeHandlerTask; 00477 if (byeHandler != NULL 00478 && (!fByeHandleActiveParticipantsOnly 00479 || (fSource != NULL 00480 && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL) 00481 || (fSink != NULL 00482 && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) { 00483 fByeHandlerTask = NULL; 00484 // we call this only once by default 00485 (*byeHandler)(fByeHandlerClientData); 00486 } 00487 00488 // We should really check for & handle >1 SSRCs being present ##### 00489 00490 subPacketOK = True; 00491 typeOfPacket = PACKET_BYE; 00492 break; 00493 } 00494 // Later handle SDES, APP, and compound RTCP packets ##### 00495 default: 00496 #ifdef DEBUG 00497 fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt); 00498 #endif 00499 subPacketOK = True; 00500 break; 00501 } 00502 if (!subPacketOK) break; 00503 00504 // need to check for (& handle) SSRC collision! ##### 00505 00506 #ifdef DEBUG 00507 fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC); 00508 #endif 00509 00510 // Skip over any remaining bytes in this subpacket: 00511 ADVANCE(length); 00512 00513 // Check whether another RTCP 'subpacket' follows: 00514 if (packetSize == 0) { 00515 packetOK = True; 00516 break; 00517 } else if (packetSize < 4) { 00518 #ifdef DEBUG 00519 fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize); 00520 #endif 00521 break; 00522 } 00523 rtcpHdr = ntohl(*(unsigned*)pkt); 00524 if ((rtcpHdr & 0xC0000000) != 0x80000000) { 00525 #ifdef DEBUG 00526 fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr); 00527 #endif 00528 break; 00529 } 00530 } 00531 00532 if (!packetOK) { 00533 #ifdef DEBUG 00534 fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr); 00535 #endif 00536 break; 00537 } else { 00538 #ifdef DEBUG 00539 fprintf(stderr, "validated entire RTCP packet\n"); 00540 #endif 00541 } 00542 00543 onReceive(typeOfPacket, totPacketSize, reportSenderSSRC); 00544 } while (0); 00545 } 00546 00547 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize, 00548 unsigned ssrc) { 00549 fTypeOfPacket = typeOfPacket; 00550 fLastReceivedSize = totPacketSize; 00551 fLastReceivedSSRC = ssrc; 00552 00553 int members = (int)numMembers(); 00554 int senders = (fSink != NULL) ? 1 : 0; 00555 00556 OnReceive(this, // p 00557 this, // e 00558 &members, // members 00559 &fPrevNumMembers, // pmembers 00560 &senders, // senders 00561 &fAveRTCPSize, // avg_rtcp_size 00562 &fPrevReportTime, // tp 00563 dTimeNow(), // tc 00564 fNextReportTime); 00565 } 00566 00567 void RTCPInstance::sendReport() { 00568 #ifdef DEBUG 00569 fprintf(stderr, "sending REPORT\n"); 00570 #endif 00571 // Begin by including a SR and/or RR report: 00572 addReport(); 00573 00574 // Then, include a SDES: 00575 addSDES(); 00576 00577 // Send the report: 00578 sendBuiltPacket(); 00579 00580 // Periodically clean out old members from our SSRC membership database: 00581 const unsigned membershipReapPeriod = 5; 00582 if ((++fOutgoingReportCount) % membershipReapPeriod == 0) { 00583 unsigned threshold = fOutgoingReportCount - membershipReapPeriod; 00584 fKnownMembers->reapOldMembers(threshold); 00585 } 00586 } 00587 00588 void RTCPInstance::sendBYE() { 00589 #ifdef DEBUG 00590 fprintf(stderr, "sending BYE\n"); 00591 #endif 00592 // The packet must begin with a SR and/or RR report: 00593 addReport(); 00594 00595 addBYE(); 00596 sendBuiltPacket(); 00597 } 00598 00599 void RTCPInstance::sendBuiltPacket() { 00600 #ifdef DEBUG 00601 fprintf(stderr, "sending RTCP packet\n"); 00602 unsigned char* p = fOutBuf->packet(); 00603 for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) { 00604 if (i%4 == 0) fprintf(stderr," "); 00605 fprintf(stderr, "%02x", p[i]); 00606 } 00607 fprintf(stderr, "\n"); 00608 #endif 00609 unsigned reportSize = fOutBuf->curPacketSize(); 00610 fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize); 00611 fOutBuf->resetOffset(); 00612 00613 fLastSentSize = IP_UDP_HDR_SIZE + reportSize; 00614 fHaveJustSentPacket = True; 00615 fLastPacketSentSize = reportSize; 00616 } 00617 00618 int RTCPInstance::checkNewSSRC() { 00619 return fKnownMembers->noteMembership(fLastReceivedSSRC, 00620 fOutgoingReportCount); 00621 } 00622 00623 void RTCPInstance::removeLastReceivedSSRC() { 00624 removeSSRC(fLastReceivedSSRC, False/*keep stats around*/); 00625 } 00626 00627 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) { 00628 fKnownMembers->remove(ssrc); 00629 00630 if (alsoRemoveStats) { 00631 // Also, remove records of this SSRC from any reception or transmission stats 00632 if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc); 00633 if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc); 00634 } 00635 } 00636 00637 void RTCPInstance::onExpire(RTCPInstance* instance) { 00638 instance->onExpire1(); 00639 } 00640 00641 // Member functions to build specific kinds of report: 00642 00643 void RTCPInstance::addReport() { 00644 // Include a SR or a RR, depending on whether we 00645 // have an associated sink or source: 00646 if (fSink != NULL) { 00647 addSR(); 00648 } else if (fSource != NULL) { 00649 addRR(); 00650 } 00651 } 00652 00653 void RTCPInstance::addSR() { 00654 // ASSERT: fSink != NULL 00655 00656 enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(), 00657 5 /* extra words in a SR */); 00658 00659 // Now, add the 'sender info' for our sink 00660 00661 // Insert the NTP and RTP timestamps for the 'wallclock time': 00662 struct timeval timeNow; 00663 gettimeofday(&timeNow, NULL); 00664 fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80); 00665 // NTP timestamp most-significant word (1970 epoch -> 1900 epoch) 00666 double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000; // 2^32/10^6 00667 fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5)); 00668 // NTP timestamp least-significant word 00669 unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow); 00670 fOutBuf->enqueueWord(rtpTimestamp); // RTP ts 00671 00672 // Insert the packet and byte counts: 00673 fOutBuf->enqueueWord(fSink->packetCount()); 00674 fOutBuf->enqueueWord(fSink->octetCount()); 00675 00676 enqueueCommonReportSuffix(); 00677 } 00678 00679 void RTCPInstance::addRR() { 00680 // ASSERT: fSource != NULL 00681 00682 enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC()); 00683 enqueueCommonReportSuffix(); 00684 } 00685 00686 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType, 00687 unsigned SSRC, 00688 unsigned numExtraWords) { 00689 unsigned numReportingSources; 00690 if (fSource == NULL) { 00691 numReportingSources = 0; // we don't receive anything 00692 } else { 00693 RTPReceptionStatsDB& allReceptionStats 00694 = fSource->receptionStatsDB(); 00695 numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset(); 00696 // This must be <32, to fit in 5 bits: 00697 if (numReportingSources >= 32) { numReportingSources = 32; } 00698 // Later: support adding more reports to handle >32 sources (unlikely)##### 00699 } 00700 00701 unsigned rtcpHdr = 0x80000000; // version 2, no padding 00702 rtcpHdr |= (numReportingSources<<24); 00703 rtcpHdr |= (packetType<<16); 00704 rtcpHdr |= (1 + numExtraWords + 6*numReportingSources); 00705 // each report block is 6 32-bit words long 00706 fOutBuf->enqueueWord(rtcpHdr); 00707 00708 fOutBuf->enqueueWord(SSRC); 00709 } 00710 00711 void RTCPInstance::enqueueCommonReportSuffix() { 00712 // Output the report blocks for each source: 00713 if (fSource != NULL) { 00714 RTPReceptionStatsDB& allReceptionStats 00715 = fSource->receptionStatsDB(); 00716 00717 RTPReceptionStatsDB::Iterator iterator(allReceptionStats); 00718 while (1) { 00719 RTPReceptionStats* receptionStats = iterator.next(); 00720 if (receptionStats == NULL) break; 00721 enqueueReportBlock(receptionStats); 00722 } 00723 00724 allReceptionStats.reset(); // because we have just generated a report 00725 } 00726 } 00727 00728 void 00729 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) { 00730 fOutBuf->enqueueWord(stats->SSRC()); 00731 00732 unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived(); 00733 00734 unsigned totNumExpected 00735 = highestExtSeqNumReceived - stats->baseExtSeqNumReceived(); 00736 int totNumLost = totNumExpected - stats->totNumPacketsReceived(); 00737 // 'Clamp' this loss number to a 24-bit signed value: 00738 if (totNumLost > 0x007FFFFF) { 00739 totNumLost = 0x007FFFFF; 00740 } else if (totNumLost < 0) { 00741 if (totNumLost < -0x00800000) totNumLost = 0x00800000; // unlikely, but... 00742 totNumLost &= 0x00FFFFFF; 00743 } 00744 00745 unsigned numExpectedSinceLastReset 00746 = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived(); 00747 int numLostSinceLastReset 00748 = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset(); 00749 unsigned char lossFraction; 00750 if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) { 00751 lossFraction = 0; 00752 } else { 00753 lossFraction = (unsigned char) 00754 ((numLostSinceLastReset << 8) / numExpectedSinceLastReset); 00755 } 00756 00757 fOutBuf->enqueueWord((lossFraction<<24) | totNumLost); 00758 fOutBuf->enqueueWord(highestExtSeqNumReceived); 00759 00760 fOutBuf->enqueueWord(stats->jitter()); 00761 00762 unsigned NTPmsw = stats->lastReceivedSR_NTPmsw(); 00763 unsigned NTPlsw = stats->lastReceivedSR_NTPlsw(); 00764 unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16); // middle 32 bits 00765 fOutBuf->enqueueWord(LSR); 00766 00767 // Figure out how long has elapsed since the last SR rcvd from this src: 00768 struct timeval const& LSRtime = stats->lastReceivedSR_time(); // "last SR" 00769 struct timeval timeNow, timeSinceLSR; 00770 gettimeofday(&timeNow, NULL); 00771 if (timeNow.tv_usec < LSRtime.tv_usec) { 00772 timeNow.tv_usec += 1000000; 00773 timeNow.tv_sec -= 1; 00774 } 00775 timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec; 00776 timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec; 00777 // The enqueued time is in units of 1/65536 seconds. 00778 // (Note that 65536/1000000 == 1024/15625) 00779 unsigned DLSR; 00780 if (LSR == 0) { 00781 DLSR = 0; 00782 } else { 00783 DLSR = (timeSinceLSR.tv_sec<<16) 00784 | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF); 00785 } 00786 fOutBuf->enqueueWord(DLSR); 00787 } 00788 00789 void RTCPInstance::addSDES() { 00790 // For now we support only the CNAME item; later support more ##### 00791 00792 // Begin by figuring out the size of the entire SDES report: 00793 unsigned numBytes = 4; 00794 // counts the SSRC, but not the header; it'll get subtracted out 00795 numBytes += fCNAME.totalSize(); // includes id and length 00796 numBytes += 1; // the special END item 00797 00798 unsigned num4ByteWords = (numBytes + 3)/4; 00799 00800 unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC chunk 00801 rtcpHdr |= (RTCP_PT_SDES<<16); 00802 rtcpHdr |= num4ByteWords; 00803 fOutBuf->enqueueWord(rtcpHdr); 00804 00805 if (fSource != NULL) { 00806 fOutBuf->enqueueWord(fSource->SSRC()); 00807 } else if (fSink != NULL) { 00808 fOutBuf->enqueueWord(fSink->SSRC()); 00809 } 00810 00811 // Add the CNAME: 00812 fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize()); 00813 00814 // Add the 'END' item (i.e., a zero byte), plus any more needed to pad: 00815 unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4); 00816 unsigned char const zero = '\0'; 00817 while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1); 00818 } 00819 00820 void RTCPInstance::addBYE() { 00821 unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC 00822 rtcpHdr |= (RTCP_PT_BYE<<16); 00823 rtcpHdr |= 1; // 2 32-bit words total (i.e., with 1 SSRC) 00824 fOutBuf->enqueueWord(rtcpHdr); 00825 00826 if (fSource != NULL) { 00827 fOutBuf->enqueueWord(fSource->SSRC()); 00828 } else if (fSink != NULL) { 00829 fOutBuf->enqueueWord(fSink->SSRC()); 00830 } 00831 } 00832 00833 void RTCPInstance::schedule(double nextTime) { 00834 fNextReportTime = nextTime; 00835 00836 double secondsToDelay = nextTime - dTimeNow(); 00837 #ifdef DEBUG 00838 fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime); 00839 #endif 00840 int usToGo = (int)(secondsToDelay * 1000000); 00841 nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo, 00842 (TaskFunc*)RTCPInstance::onExpire, this); 00843 } 00844 00845 void RTCPInstance::reschedule(double nextTime) { 00846 envir().taskScheduler().unscheduleDelayedTask(nextTask()); 00847 schedule(nextTime); 00848 } 00849 00850 void RTCPInstance::onExpire1() { 00851 // Note: fTotSessionBW is kbits per second 00852 double rtcpBW = 0.05*fTotSessionBW*1024/8; // -> bytes per second 00853 00854 OnExpire(this, // event 00855 numMembers(), // members 00856 (fSink != NULL) ? 1 : 0, // senders 00857 rtcpBW, // rtcp_bw 00858 (fSink != NULL) ? 1 : 0, // we_sent 00859 &fAveRTCPSize, // ave_rtcp_size 00860 &fIsInitial, // initial 00861 dTimeNow(), // tc 00862 &fPrevReportTime, // tp 00863 &fPrevNumMembers // pmembers 00864 ); 00865 } 00866 00868 00869 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) { 00870 unsigned length = strlen((char const*)value); 00871 if (length > 511) length = 511; 00872 00873 fData[0] = tag; 00874 fData[1] = (unsigned char)length; 00875 memmove(&fData[2], value, length); 00876 00877 // Pad the trailing bytes to a 4-byte boundary: 00878 while ((length)%4 > 0) fData[2 + length++] = '\0'; 00879 } 00880 00881 unsigned SDESItem::totalSize() const { 00882 return 2 + (unsigned)fData[1]; 00883 } 00884 00885 00887 00888 extern "C" void Schedule(double nextTime, event e) { 00889 RTCPInstance* instance = (RTCPInstance*)e; 00890 if (instance == NULL) return; 00891 00892 instance->schedule(nextTime); 00893 } 00894 00895 extern "C" void Reschedule(double nextTime, event e) { 00896 RTCPInstance* instance = (RTCPInstance*)e; 00897 if (instance == NULL) return; 00898 00899 instance->reschedule(nextTime); 00900 } 00901 00902 extern "C" void SendRTCPReport(event e) { 00903 RTCPInstance* instance = (RTCPInstance*)e; 00904 if (instance == NULL) return; 00905 00906 instance->sendReport(); 00907 } 00908 00909 extern "C" void SendBYEPacket(event e) { 00910 RTCPInstance* instance = (RTCPInstance*)e; 00911 if (instance == NULL) return; 00912 00913 instance->sendBYE(); 00914 } 00915 00916 extern "C" int TypeOfEvent(event e) { 00917 RTCPInstance* instance = (RTCPInstance*)e; 00918 if (instance == NULL) return EVENT_UNKNOWN; 00919 00920 return instance->typeOfEvent(); 00921 } 00922 00923 extern "C" int SentPacketSize(event e) { 00924 RTCPInstance* instance = (RTCPInstance*)e; 00925 if (instance == NULL) return 0; 00926 00927 return instance->sentPacketSize(); 00928 } 00929 00930 extern "C" int PacketType(packet p) { 00931 RTCPInstance* instance = (RTCPInstance*)p; 00932 if (instance == NULL) return PACKET_UNKNOWN_TYPE; 00933 00934 return instance->packetType(); 00935 } 00936 00937 extern "C" int ReceivedPacketSize(packet p) { 00938 RTCPInstance* instance = (RTCPInstance*)p; 00939 if (instance == NULL) return 0; 00940 00941 return instance->receivedPacketSize(); 00942 } 00943 00944 extern "C" int NewMember(packet p) { 00945 RTCPInstance* instance = (RTCPInstance*)p; 00946 if (instance == NULL) return 0; 00947 00948 return instance->checkNewSSRC(); 00949 } 00950 00951 extern "C" int NewSender(packet /*p*/) { 00952 return 0; // we don't yet recognize senders other than ourselves ##### 00953 } 00954 00955 extern "C" void AddMember(packet /*p*/) { 00956 // Do nothing; all of the real work was done when NewMember() was called 00957 } 00958 00959 extern "C" void AddSender(packet /*p*/) { 00960 // we don't yet recognize senders other than ourselves ##### 00961 } 00962 00963 extern "C" void RemoveMember(packet p) { 00964 RTCPInstance* instance = (RTCPInstance*)p; 00965 if (instance == NULL) return; 00966 00967 instance->removeLastReceivedSSRC(); 00968 } 00969 00970 extern "C" void RemoveSender(packet /*p*/) { 00971 // we don't yet recognize senders other than ourselves ##### 00972 } 00973 00974 extern "C" double drand30() { 00975 unsigned tmp = our_random()&0x3FFFFFFF; // a random 30-bit integer 00976 return tmp/(double)(1024*1024*1024); 00977 }
1.7.6.1